initial commit
This commit is contained in:
251
server/websocket.js
Normal file
251
server/websocket.js
Normal file
@@ -0,0 +1,251 @@
|
||||
/**
|
||||
* @cc-platform/server/websocket — WebSocket manager for bridge + client connections
|
||||
*
|
||||
* Extracted from the structurally identical WebSocket setup in both
|
||||
* remoteturtle/server/server.js and Inventory-Manager-CC/web/server/server.js.
|
||||
*
|
||||
* Provides:
|
||||
* - URL-based routing (/ws/bridge vs /ws for clients)
|
||||
* - Bridge connection lifecycle (add/remove/message dispatch)
|
||||
* - Client connection lifecycle (add/remove/initial state/broadcast)
|
||||
* - Ping/pong keepalive with stale connection cleanup
|
||||
* - broadcastToClients() and sendToBridge() helpers
|
||||
* - Optional API key auth on WebSocket upgrade
|
||||
*
|
||||
* Usage:
|
||||
* const { createWebSocketManager } = require('@cc-platform/server/websocket');
|
||||
* const wsManager = createWebSocketManager(httpServer, {
|
||||
* apiKey: process.env.API_KEY,
|
||||
* onBridgeMessage: (ws, data) => { ... },
|
||||
* onClientConnect: (ws) => { ws.send(JSON.stringify(initialState)); },
|
||||
* onClientMessage: (ws, data) => { ... },
|
||||
* });
|
||||
* wsManager.broadcastToClients({ type: 'update', ... });
|
||||
*/
|
||||
|
||||
'use strict';
|
||||
|
||||
const { WebSocketServer } = require('ws');
|
||||
const url = require('url');
|
||||
|
||||
/**
|
||||
* @typedef {Object} WebSocketManagerOptions
|
||||
* @property {string} [apiKey] API key for bridge auth (empty = no auth)
|
||||
* @property {string} [bridgePath] URL path prefix for bridge connections (default: '/ws/bridge')
|
||||
* @property {number} [keepaliveMs] Keepalive interval in ms (default: 25000)
|
||||
* @property {number} [staleTimeoutMs] Max time without pong before termination (default: 60000)
|
||||
* @property {Function} [onBridgeConnect] Called when a bridge connects: (ws) => void
|
||||
* @property {Function} [onBridgeMessage] Called for each bridge message: (ws, data) => void
|
||||
* @property {Function} [onBridgeDisconnect] Called when a bridge disconnects: (ws) => void
|
||||
* @property {Function} [onClientConnect] Called when a web client connects: (ws) => void
|
||||
* @property {Function} [onClientMessage] Called for each client message: (ws, data) => void
|
||||
* @property {Function} [onClientDisconnect] Called when a web client disconnects: (ws) => void
|
||||
*/
|
||||
|
||||
/**
|
||||
* Create a WebSocket manager with bridge and client connection handling.
|
||||
*
|
||||
* @param {http.Server} server - The HTTP server to attach to
|
||||
* @param {WebSocketManagerOptions} opts - Configuration and callbacks
|
||||
* @returns {Object} Manager with bridgeClients, webClients, broadcast/send helpers
|
||||
*/
|
||||
function createWebSocketManager(server, opts = {}) {
|
||||
const bridgeClients = new Set();
|
||||
const webClients = new Set();
|
||||
const apiKey = opts.apiKey || '';
|
||||
const bridgePath = opts.bridgePath || '/ws/bridge';
|
||||
const keepaliveMs = opts.keepaliveMs || 25000;
|
||||
const staleTimeoutMs = opts.staleTimeoutMs || 60000;
|
||||
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
|
||||
// --- Upgrade handler with optional auth ---
|
||||
server.on('upgrade', (request, socket, head) => {
|
||||
const parsed = url.parse(request.url, true);
|
||||
const pathname = parsed.pathname;
|
||||
|
||||
// Auth check on bridge connections
|
||||
if (apiKey && pathname.startsWith(bridgePath)) {
|
||||
const key = parsed.query.key || '';
|
||||
if (key !== apiKey) {
|
||||
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
wss.emit('connection', ws, request);
|
||||
});
|
||||
});
|
||||
|
||||
// --- Connection routing ---
|
||||
wss.on('connection', (ws, request) => {
|
||||
const pathname = url.parse(request.url).pathname;
|
||||
|
||||
// Track pong timestamps for keepalive
|
||||
ws._lastPong = Date.now();
|
||||
ws.on('pong', () => { ws._lastPong = Date.now(); });
|
||||
|
||||
if (pathname.startsWith(bridgePath)) {
|
||||
handleBridgeConnection(ws);
|
||||
} else {
|
||||
handleClientConnection(ws);
|
||||
}
|
||||
});
|
||||
|
||||
// --- Bridge connection lifecycle ---
|
||||
function handleBridgeConnection(ws) {
|
||||
bridgeClients.add(ws);
|
||||
console.log(`[ws] Bridge connected (${bridgeClients.size} total)`);
|
||||
|
||||
if (opts.onBridgeConnect) {
|
||||
try { opts.onBridgeConnect(ws); } catch (e) {
|
||||
console.error('[ws] onBridgeConnect error:', e.message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.on('message', (raw) => {
|
||||
try {
|
||||
const data = JSON.parse(raw);
|
||||
// Handle ping/pong at the protocol layer
|
||||
if (data.type === 'ping') {
|
||||
ws.send(JSON.stringify({ type: 'pong' }));
|
||||
return;
|
||||
}
|
||||
if (opts.onBridgeMessage) {
|
||||
opts.onBridgeMessage(ws, data);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[ws] Bridge message parse error:', e.message);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
bridgeClients.delete(ws);
|
||||
console.log(`[ws] Bridge disconnected (${bridgeClients.size} remaining)`);
|
||||
if (opts.onBridgeDisconnect) {
|
||||
try { opts.onBridgeDisconnect(ws); } catch (e) {
|
||||
console.error('[ws] onBridgeDisconnect error:', e.message);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('error', (err) => {
|
||||
console.error('[ws] Bridge error:', err.message);
|
||||
bridgeClients.delete(ws);
|
||||
});
|
||||
}
|
||||
|
||||
// --- Client connection lifecycle ---
|
||||
function handleClientConnection(ws) {
|
||||
webClients.add(ws);
|
||||
console.log(`[ws] Client connected (${webClients.size} total)`);
|
||||
|
||||
if (opts.onClientConnect) {
|
||||
try { opts.onClientConnect(ws); } catch (e) {
|
||||
console.error('[ws] onClientConnect error:', e.message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.on('message', (raw) => {
|
||||
try {
|
||||
const data = JSON.parse(raw);
|
||||
if (opts.onClientMessage) {
|
||||
opts.onClientMessage(ws, data);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[ws] Client message parse error:', e.message);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
webClients.delete(ws);
|
||||
console.log(`[ws] Client disconnected (${webClients.size} remaining)`);
|
||||
if (opts.onClientDisconnect) {
|
||||
try { opts.onClientDisconnect(ws); } catch (e) {
|
||||
console.error('[ws] onClientDisconnect error:', e.message);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('error', (err) => {
|
||||
console.error('[ws] Client error:', err.message);
|
||||
webClients.delete(ws);
|
||||
});
|
||||
}
|
||||
|
||||
// --- Broadcast helpers ---
|
||||
|
||||
/**
|
||||
* Send a JSON message to all connected web clients.
|
||||
* Silently skips clients that are not in OPEN state.
|
||||
*/
|
||||
function broadcastToClients(data) {
|
||||
const message = JSON.stringify(data);
|
||||
webClients.forEach((client) => {
|
||||
try {
|
||||
if (client.readyState === 1) { // WebSocket.OPEN
|
||||
client.send(message);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[ws] Broadcast error:', e.message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a JSON message to all connected bridge clients.
|
||||
* Returns true if at least one bridge received the message.
|
||||
*/
|
||||
function sendToBridge(data) {
|
||||
const message = JSON.stringify(data);
|
||||
let sent = false;
|
||||
bridgeClients.forEach((bridge) => {
|
||||
try {
|
||||
if (bridge.readyState === 1) {
|
||||
bridge.send(message);
|
||||
sent = true;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[ws] Bridge send error:', e.message);
|
||||
}
|
||||
});
|
||||
return sent;
|
||||
}
|
||||
|
||||
// --- Keepalive ---
|
||||
const keepaliveInterval = setInterval(() => {
|
||||
const now = Date.now();
|
||||
const allClients = [...bridgeClients, ...webClients];
|
||||
for (const ws of allClients) {
|
||||
if (ws._lastPong && (now - ws._lastPong) > staleTimeoutMs) {
|
||||
console.log('[ws] Terminating stale connection');
|
||||
ws.terminate();
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
ws.ping();
|
||||
} catch (e) {
|
||||
// Ignore ping errors on closing connections
|
||||
}
|
||||
}
|
||||
}, keepaliveMs);
|
||||
|
||||
// Cleanup on server close
|
||||
wss.on('close', () => clearInterval(keepaliveInterval));
|
||||
|
||||
return {
|
||||
wss,
|
||||
bridgeClients,
|
||||
webClients,
|
||||
broadcastToClients,
|
||||
sendToBridge,
|
||||
close() {
|
||||
clearInterval(keepaliveInterval);
|
||||
wss.close();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = { createWebSocketManager };
|
||||
Reference in New Issue
Block a user