Files
cc-platform-core/server/websocket.js
2026-03-26 15:00:49 -04:00

252 lines
7.7 KiB
JavaScript

/**
* @cc-platform/server/websocket — WebSocket manager for bridge + client connections
*
* Extracted from the structurally identical WebSocket setup in both
* remoteturtle/server/server.js and Inventory-Manager-CC/web/server/server.js.
*
* Provides:
* - URL-based routing (/ws/bridge vs /ws for clients)
* - Bridge connection lifecycle (add/remove/message dispatch)
* - Client connection lifecycle (add/remove/initial state/broadcast)
* - Ping/pong keepalive with stale connection cleanup
* - broadcastToClients() and sendToBridge() helpers
* - Optional API key auth on WebSocket upgrade
*
* Usage:
* const { createWebSocketManager } = require('@cc-platform/server/websocket');
* const wsManager = createWebSocketManager(httpServer, {
* apiKey: process.env.API_KEY,
* onBridgeMessage: (ws, data) => { ... },
* onClientConnect: (ws) => { ws.send(JSON.stringify(initialState)); },
* onClientMessage: (ws, data) => { ... },
* });
* wsManager.broadcastToClients({ type: 'update', ... });
*/
'use strict';
const { WebSocketServer } = require('ws');
const url = require('url');
/**
* @typedef {Object} WebSocketManagerOptions
* @property {string} [apiKey] API key for bridge auth (empty = no auth)
* @property {string} [bridgePath] URL path prefix for bridge connections (default: '/ws/bridge')
* @property {number} [keepaliveMs] Keepalive interval in ms (default: 25000)
* @property {number} [staleTimeoutMs] Max time without pong before termination (default: 60000)
* @property {Function} [onBridgeConnect] Called when a bridge connects: (ws) => void
* @property {Function} [onBridgeMessage] Called for each bridge message: (ws, data) => void
* @property {Function} [onBridgeDisconnect] Called when a bridge disconnects: (ws) => void
* @property {Function} [onClientConnect] Called when a web client connects: (ws) => void
* @property {Function} [onClientMessage] Called for each client message: (ws, data) => void
* @property {Function} [onClientDisconnect] Called when a web client disconnects: (ws) => void
*/
/**
* Create a WebSocket manager with bridge and client connection handling.
*
* @param {http.Server} server - The HTTP server to attach to
* @param {WebSocketManagerOptions} opts - Configuration and callbacks
* @returns {Object} Manager with bridgeClients, webClients, broadcast/send helpers
*/
function createWebSocketManager(server, opts = {}) {
const bridgeClients = new Set();
const webClients = new Set();
const apiKey = opts.apiKey || '';
const bridgePath = opts.bridgePath || '/ws/bridge';
const keepaliveMs = opts.keepaliveMs || 25000;
const staleTimeoutMs = opts.staleTimeoutMs || 60000;
const wss = new WebSocketServer({ noServer: true });
// --- Upgrade handler with optional auth ---
server.on('upgrade', (request, socket, head) => {
const parsed = url.parse(request.url, true);
const pathname = parsed.pathname;
// Auth check on bridge connections
if (apiKey && pathname.startsWith(bridgePath)) {
const key = parsed.query.key || '';
if (key !== apiKey) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
}
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
});
// --- Connection routing ---
wss.on('connection', (ws, request) => {
const pathname = url.parse(request.url).pathname;
// Track pong timestamps for keepalive
ws._lastPong = Date.now();
ws.on('pong', () => { ws._lastPong = Date.now(); });
if (pathname.startsWith(bridgePath)) {
handleBridgeConnection(ws);
} else {
handleClientConnection(ws);
}
});
// --- Bridge connection lifecycle ---
function handleBridgeConnection(ws) {
bridgeClients.add(ws);
console.log(`[ws] Bridge connected (${bridgeClients.size} total)`);
if (opts.onBridgeConnect) {
try { opts.onBridgeConnect(ws); } catch (e) {
console.error('[ws] onBridgeConnect error:', e.message);
}
}
ws.on('message', (raw) => {
try {
const data = JSON.parse(raw);
// Handle ping/pong at the protocol layer
if (data.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
return;
}
if (opts.onBridgeMessage) {
opts.onBridgeMessage(ws, data);
}
} catch (e) {
console.error('[ws] Bridge message parse error:', e.message);
}
});
ws.on('close', () => {
bridgeClients.delete(ws);
console.log(`[ws] Bridge disconnected (${bridgeClients.size} remaining)`);
if (opts.onBridgeDisconnect) {
try { opts.onBridgeDisconnect(ws); } catch (e) {
console.error('[ws] onBridgeDisconnect error:', e.message);
}
}
});
ws.on('error', (err) => {
console.error('[ws] Bridge error:', err.message);
bridgeClients.delete(ws);
});
}
// --- Client connection lifecycle ---
function handleClientConnection(ws) {
webClients.add(ws);
console.log(`[ws] Client connected (${webClients.size} total)`);
if (opts.onClientConnect) {
try { opts.onClientConnect(ws); } catch (e) {
console.error('[ws] onClientConnect error:', e.message);
}
}
ws.on('message', (raw) => {
try {
const data = JSON.parse(raw);
if (opts.onClientMessage) {
opts.onClientMessage(ws, data);
}
} catch (e) {
console.error('[ws] Client message parse error:', e.message);
}
});
ws.on('close', () => {
webClients.delete(ws);
console.log(`[ws] Client disconnected (${webClients.size} remaining)`);
if (opts.onClientDisconnect) {
try { opts.onClientDisconnect(ws); } catch (e) {
console.error('[ws] onClientDisconnect error:', e.message);
}
}
});
ws.on('error', (err) => {
console.error('[ws] Client error:', err.message);
webClients.delete(ws);
});
}
// --- Broadcast helpers ---
/**
* Send a JSON message to all connected web clients.
* Silently skips clients that are not in OPEN state.
*/
function broadcastToClients(data) {
const message = JSON.stringify(data);
webClients.forEach((client) => {
try {
if (client.readyState === 1) { // WebSocket.OPEN
client.send(message);
}
} catch (e) {
console.error('[ws] Broadcast error:', e.message);
}
});
}
/**
* Send a JSON message to all connected bridge clients.
* Returns true if at least one bridge received the message.
*/
function sendToBridge(data) {
const message = JSON.stringify(data);
let sent = false;
bridgeClients.forEach((bridge) => {
try {
if (bridge.readyState === 1) {
bridge.send(message);
sent = true;
}
} catch (e) {
console.error('[ws] Bridge send error:', e.message);
}
});
return sent;
}
// --- Keepalive ---
const keepaliveInterval = setInterval(() => {
const now = Date.now();
const allClients = [...bridgeClients, ...webClients];
for (const ws of allClients) {
if (ws._lastPong && (now - ws._lastPong) > staleTimeoutMs) {
console.log('[ws] Terminating stale connection');
ws.terminate();
continue;
}
try {
ws.ping();
} catch (e) {
// Ignore ping errors on closing connections
}
}
}, keepaliveMs);
// Cleanup on server close
wss.on('close', () => clearInterval(keepaliveInterval));
return {
wss,
bridgeClients,
webClients,
broadcastToClients,
sendToBridge,
close() {
clearInterval(keepaliveInterval);
wss.close();
},
};
}
module.exports = { createWebSocketManager };