/** * @cc-platform/server/websocket — WebSocket manager for bridge + client connections * * Extracted from the structurally identical WebSocket setup in both * remoteturtle/server/server.js and Inventory-Manager-CC/web/server/server.js. * * Provides: * - URL-based routing (/ws/bridge vs /ws for clients) * - Bridge connection lifecycle (add/remove/message dispatch) * - Client connection lifecycle (add/remove/initial state/broadcast) * - Ping/pong keepalive with stale connection cleanup * - broadcastToClients() and sendToBridge() helpers * - Optional API key auth on WebSocket upgrade * * Usage: * const { createWebSocketManager } = require('@cc-platform/server/websocket'); * const wsManager = createWebSocketManager(httpServer, { * apiKey: process.env.API_KEY, * onBridgeMessage: (ws, data) => { ... }, * onClientConnect: (ws) => { ws.send(JSON.stringify(initialState)); }, * onClientMessage: (ws, data) => { ... }, * }); * wsManager.broadcastToClients({ type: 'update', ... }); */ 'use strict'; const { WebSocketServer } = require('ws'); const url = require('url'); /** * @typedef {Object} WebSocketManagerOptions * @property {string} [apiKey] API key for bridge auth (empty = no auth) * @property {string} [bridgePath] URL path prefix for bridge connections (default: '/ws/bridge') * @property {number} [keepaliveMs] Keepalive interval in ms (default: 25000) * @property {number} [staleTimeoutMs] Max time without pong before termination (default: 60000) * @property {Function} [onBridgeConnect] Called when a bridge connects: (ws) => void * @property {Function} [onBridgeMessage] Called for each bridge message: (ws, data) => void * @property {Function} [onBridgeDisconnect] Called when a bridge disconnects: (ws) => void * @property {Function} [onClientConnect] Called when a web client connects: (ws) => void * @property {Function} [onClientMessage] Called for each client message: (ws, data) => void * @property {Function} [onClientDisconnect] Called when a web client disconnects: (ws) => void */ /** * Create a WebSocket manager with bridge and client connection handling. * * @param {http.Server} server - The HTTP server to attach to * @param {WebSocketManagerOptions} opts - Configuration and callbacks * @returns {Object} Manager with bridgeClients, webClients, broadcast/send helpers */ function createWebSocketManager(server, opts = {}) { const bridgeClients = new Set(); const webClients = new Set(); const apiKey = opts.apiKey || ''; const bridgePath = opts.bridgePath || '/ws/bridge'; const keepaliveMs = opts.keepaliveMs || 25000; const staleTimeoutMs = opts.staleTimeoutMs || 60000; const wss = new WebSocketServer({ noServer: true }); // --- Upgrade handler with optional auth --- server.on('upgrade', (request, socket, head) => { const parsed = url.parse(request.url, true); const pathname = parsed.pathname; // Auth check on bridge connections if (apiKey && pathname.startsWith(bridgePath)) { const key = parsed.query.key || ''; if (key !== apiKey) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } } wss.handleUpgrade(request, socket, head, (ws) => { wss.emit('connection', ws, request); }); }); // --- Connection routing --- wss.on('connection', (ws, request) => { const pathname = url.parse(request.url).pathname; // Track pong timestamps for keepalive ws._lastPong = Date.now(); ws.on('pong', () => { ws._lastPong = Date.now(); }); if (pathname.startsWith(bridgePath)) { handleBridgeConnection(ws); } else { handleClientConnection(ws); } }); // --- Bridge connection lifecycle --- function handleBridgeConnection(ws) { bridgeClients.add(ws); console.log(`[ws] Bridge connected (${bridgeClients.size} total)`); if (opts.onBridgeConnect) { try { opts.onBridgeConnect(ws); } catch (e) { console.error('[ws] onBridgeConnect error:', e.message); } } ws.on('message', (raw) => { try { const data = JSON.parse(raw); // Handle ping/pong at the protocol layer if (data.type === 'ping') { ws.send(JSON.stringify({ type: 'pong' })); return; } if (opts.onBridgeMessage) { opts.onBridgeMessage(ws, data); } } catch (e) { console.error('[ws] Bridge message parse error:', e.message); } }); ws.on('close', () => { bridgeClients.delete(ws); console.log(`[ws] Bridge disconnected (${bridgeClients.size} remaining)`); if (opts.onBridgeDisconnect) { try { opts.onBridgeDisconnect(ws); } catch (e) { console.error('[ws] onBridgeDisconnect error:', e.message); } } }); ws.on('error', (err) => { console.error('[ws] Bridge error:', err.message); bridgeClients.delete(ws); }); } // --- Client connection lifecycle --- function handleClientConnection(ws) { webClients.add(ws); console.log(`[ws] Client connected (${webClients.size} total)`); if (opts.onClientConnect) { try { opts.onClientConnect(ws); } catch (e) { console.error('[ws] onClientConnect error:', e.message); } } ws.on('message', (raw) => { try { const data = JSON.parse(raw); if (opts.onClientMessage) { opts.onClientMessage(ws, data); } } catch (e) { console.error('[ws] Client message parse error:', e.message); } }); ws.on('close', () => { webClients.delete(ws); console.log(`[ws] Client disconnected (${webClients.size} remaining)`); if (opts.onClientDisconnect) { try { opts.onClientDisconnect(ws); } catch (e) { console.error('[ws] onClientDisconnect error:', e.message); } } }); ws.on('error', (err) => { console.error('[ws] Client error:', err.message); webClients.delete(ws); }); } // --- Broadcast helpers --- /** * Send a JSON message to all connected web clients. * Silently skips clients that are not in OPEN state. */ function broadcastToClients(data) { const message = JSON.stringify(data); webClients.forEach((client) => { try { if (client.readyState === 1) { // WebSocket.OPEN client.send(message); } } catch (e) { console.error('[ws] Broadcast error:', e.message); } }); } /** * Send a JSON message to all connected bridge clients. * Returns true if at least one bridge received the message. */ function sendToBridge(data) { const message = JSON.stringify(data); let sent = false; bridgeClients.forEach((bridge) => { try { if (bridge.readyState === 1) { bridge.send(message); sent = true; } } catch (e) { console.error('[ws] Bridge send error:', e.message); } }); return sent; } // --- Keepalive --- const keepaliveInterval = setInterval(() => { const now = Date.now(); const allClients = [...bridgeClients, ...webClients]; for (const ws of allClients) { if (ws._lastPong && (now - ws._lastPong) > staleTimeoutMs) { console.log('[ws] Terminating stale connection'); ws.terminate(); continue; } try { ws.ping(); } catch (e) { // Ignore ping errors on closing connections } } }, keepaliveMs); // Cleanup on server close wss.on('close', () => clearInterval(keepaliveInterval)); return { wss, bridgeClients, webClients, broadcastToClients, sendToBridge, close() { clearInterval(keepaliveInterval); wss.close(); }, }; } module.exports = { createWebSocketManager };