feat: replace hand-rolled WebSocketServer with createWebSocketManager for improved connection handling and state updates
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
import { WebSocketServer } from 'ws';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
@@ -12,6 +11,7 @@ import {
|
||||
const require = createRequire(import.meta.url);
|
||||
const {
|
||||
createPlatformServer,
|
||||
createWebSocketManager,
|
||||
setupGracefulShutdown,
|
||||
createRateLimiter,
|
||||
createProxyEndpoint,
|
||||
@@ -46,9 +46,95 @@ app.use((req, res, next) => {
|
||||
return commandLimiter(req, res, next);
|
||||
});
|
||||
|
||||
// ========== WebSocket Manager (platform-managed) ==========
|
||||
// Replaces hand-rolled WebSocketServer with createWebSocketManager() from
|
||||
// @cc-platform/server. Handles bridge/client URL routing, API key auth on
|
||||
// upgrade, ping/pong keepalive with stale-connection cleanup.
|
||||
//
|
||||
// Bridge path: /ws/bridge — receives state/results, pushes commands real-time
|
||||
// Client path: /ws — receives initial_state on connect, sends commands
|
||||
//
|
||||
// HTTP polling fallback preserved via /api/bridge/* endpoints until WS
|
||||
// adoption is fully verified.
|
||||
const wsManager = createWebSocketManager(server, {
|
||||
apiKey: API_KEY,
|
||||
|
||||
// ---- Bridge lifecycle ----
|
||||
onBridgeConnect: () => {
|
||||
console.log('\u{1F309} CC:Tweaked bridge connected via WebSocket');
|
||||
wsManager.broadcastToClients({ type: 'state_update', bridgeConnected: true });
|
||||
},
|
||||
|
||||
onBridgeMessage: (ws, data) => {
|
||||
if (data.type === 'state') {
|
||||
// Full state update from bridge (same path as POST /api/bridge/state)
|
||||
updateStateFromBridge(data);
|
||||
} else if (data.type === 'command_result') {
|
||||
// Command result from bridge — forward to all web clients
|
||||
wsManager.broadcastToClients({
|
||||
type: 'command_result',
|
||||
commandId: data.commandId,
|
||||
action: data.action,
|
||||
success: data.success,
|
||||
message: data.message,
|
||||
error: data.error,
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
onBridgeDisconnect: () => {
|
||||
console.log('\u{1F309} CC:Tweaked bridge disconnected');
|
||||
wsManager.broadcastToClients({
|
||||
type: 'state_update',
|
||||
bridgeConnected: wsManager.bridgeClients.size > 0,
|
||||
});
|
||||
},
|
||||
|
||||
// ---- Web client lifecycle ----
|
||||
onClientConnect: (ws) => {
|
||||
console.log('\u{1F310} New web client connected');
|
||||
// Send full current state to newly connected dashboard
|
||||
ws.send(JSON.stringify({
|
||||
type: 'initial_state',
|
||||
inventory: inventoryState,
|
||||
activity: activityState,
|
||||
alerts: alertsState,
|
||||
smeltingPaused,
|
||||
disabledRecipes,
|
||||
smeltable: smeltableRecipes,
|
||||
craftable: craftableRecipes,
|
||||
craftTurtleOk,
|
||||
lastUpdate,
|
||||
bridgeConnected: wsManager.bridgeClients.size > 0,
|
||||
dropperNicknames,
|
||||
}));
|
||||
},
|
||||
|
||||
onClientMessage: (ws, data) => {
|
||||
if (data.type === 'command') {
|
||||
// Idempotency check for WS commands
|
||||
const cached = checkIdempotent(data.commandId);
|
||||
if (cached) {
|
||||
ws.send(JSON.stringify({ type: 'command_result', commandId: data.commandId, ...cached }));
|
||||
return;
|
||||
}
|
||||
// Forward command to bridge (WS push or HTTP poll queue)
|
||||
pushCommandToBridge(data);
|
||||
if (data.commandId) {
|
||||
recordCommand(data.commandId, { success: true, commandId: data.commandId });
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
onClientDisconnect: () => {
|
||||
console.log('\u{1F44B} Web client disconnected');
|
||||
},
|
||||
});
|
||||
|
||||
// Aliases — backward compatibility for code referencing these Sets directly
|
||||
const { bridgeClients, webClients } = wsManager;
|
||||
|
||||
// ========== State ==========
|
||||
const webClients = new Set();
|
||||
const bridgeClients = new Set();
|
||||
|
||||
// Load persisted state from SQLite on startup
|
||||
console.log('💾 Loading persisted state from database...');
|
||||
@@ -101,31 +187,17 @@ function recordCommand(commandId, result) {
|
||||
|
||||
// ========== Helpers ==========
|
||||
|
||||
// Broadcasts data to all connected web dashboard clients.
|
||||
// Delegates to platform WS manager (handles JSON serialization, error handling).
|
||||
function broadcastToClients(data) {
|
||||
const message = JSON.stringify(data);
|
||||
webClients.forEach((client) => {
|
||||
if (client.readyState === 1) {
|
||||
try {
|
||||
client.send(message);
|
||||
} catch (err) {
|
||||
console.error('❌ WS send error (client):', err.message);
|
||||
}
|
||||
}
|
||||
});
|
||||
wsManager.broadcastToClients(data);
|
||||
}
|
||||
|
||||
// Pushes a command to the CC:Tweaked bridge.
|
||||
// Primary: real-time push via WebSocket (wsManager.sendToBridge).
|
||||
// Fallback: queues for HTTP polling if no WS bridge is connected.
|
||||
function pushCommandToBridge(command) {
|
||||
let sent = false;
|
||||
for (const bridge of bridgeClients) {
|
||||
if (bridge.readyState === 1) {
|
||||
try {
|
||||
bridge.send(JSON.stringify(command));
|
||||
sent = true;
|
||||
} catch (err) {
|
||||
console.error('❌ WS send error (bridge):', err.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
const sent = wsManager.sendToBridge(command);
|
||||
if (!sent) {
|
||||
// Fallback: queue for HTTP polling with monotonic ID
|
||||
const id = nextCommandId++;
|
||||
@@ -1094,9 +1166,9 @@ function updateStateFromBridge(data) {
|
||||
}
|
||||
}
|
||||
|
||||
// ========== WebSocket Server ==========
|
||||
|
||||
const wss = new WebSocketServer({ noServer: true, maxPayload: 1 * 1024 * 1024 /* 1 MB */ });
|
||||
// ========== Server Startup Info ==========
|
||||
// WebSocket management is handled by createWebSocketManager() (initialized above).
|
||||
// Bridge: /ws/bridge | Client: /ws | Keepalive: 25s ping/pong (platform default)
|
||||
|
||||
console.log(`🚀 Inventory Manager Web Server starting...`);
|
||||
console.log(`📡 HTTP Server: http://localhost:${port}`);
|
||||
@@ -1107,174 +1179,6 @@ if (API_KEY) {
|
||||
console.log('⚠️ No API_KEY set \u2014 authentication disabled (open access)');
|
||||
}
|
||||
|
||||
// Authenticate WebSocket upgrades
|
||||
server.on('upgrade', (req, socket, head) => {
|
||||
if (API_KEY) {
|
||||
// Extract key from query string: /ws?key=... or /ws/bridge?key=...
|
||||
const urlObj = new URL(req.url, `http://${req.headers.host}`);
|
||||
const token = urlObj.searchParams.get('key') || '';
|
||||
if (token !== API_KEY) {
|
||||
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
}
|
||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||
wss.emit('connection', ws, req);
|
||||
});
|
||||
});
|
||||
|
||||
wss.on('connection', (ws, req) => {
|
||||
const url = req.url || '';
|
||||
|
||||
// ---- Bridge WebSocket connection ----
|
||||
if (url.startsWith('/ws/bridge')) {
|
||||
console.log('🌉 CC:Tweaked bridge connected via WebSocket');
|
||||
bridgeClients.add(ws);
|
||||
ws.isAlive = true;
|
||||
|
||||
// Notify web clients that the bridge is now connected
|
||||
broadcastToClients({
|
||||
type: 'state_update',
|
||||
bridgeConnected: true,
|
||||
});
|
||||
|
||||
ws.on('message', (raw) => {
|
||||
try {
|
||||
const data = JSON.parse(raw);
|
||||
|
||||
if (data.type === 'ping') {
|
||||
ws.send(JSON.stringify({ type: 'pong' }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.type === 'state') {
|
||||
// Full state update from bridge
|
||||
updateStateFromBridge(data);
|
||||
} else if (data.type === 'command_result') {
|
||||
// Command result from bridge — include commandId
|
||||
broadcastToClients({
|
||||
type: 'command_result',
|
||||
commandId: data.commandId,
|
||||
action: data.action,
|
||||
success: data.success,
|
||||
message: data.message,
|
||||
error: data.error,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ Bridge WS message error:', error.message);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
console.log('🌉 CC:Tweaked bridge disconnected');
|
||||
bridgeClients.delete(ws);
|
||||
|
||||
// Notify web clients that the bridge may be disconnected
|
||||
broadcastToClients({
|
||||
type: 'state_update',
|
||||
bridgeConnected: bridgeClients.size > 0,
|
||||
});
|
||||
});
|
||||
|
||||
ws.on('pong', () => { ws.isAlive = true; });
|
||||
|
||||
ws.on('error', (error) => {
|
||||
console.error('❌ Bridge WS error:', error);
|
||||
bridgeClients.delete(ws);
|
||||
|
||||
broadcastToClients({
|
||||
type: 'state_update',
|
||||
bridgeConnected: bridgeClients.size > 0,
|
||||
});
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// ---- Web client WebSocket connection ----
|
||||
console.log('🌐 New web client connected');
|
||||
webClients.add(ws);
|
||||
ws.isAlive = true;
|
||||
|
||||
// Send current state to new client
|
||||
ws.send(JSON.stringify({
|
||||
type: 'initial_state',
|
||||
inventory: inventoryState,
|
||||
activity: activityState,
|
||||
alerts: alertsState,
|
||||
smeltingPaused,
|
||||
disabledRecipes,
|
||||
smeltable: smeltableRecipes,
|
||||
craftable: craftableRecipes,
|
||||
craftTurtleOk,
|
||||
lastUpdate,
|
||||
bridgeConnected: bridgeClients.size > 0,
|
||||
dropperNicknames,
|
||||
}));
|
||||
|
||||
ws.on('pong', () => { ws.isAlive = true; });
|
||||
|
||||
ws.on('message', (message) => {
|
||||
try {
|
||||
const data = JSON.parse(message);
|
||||
|
||||
if (data.type === 'command') {
|
||||
// Idempotency check for WS commands
|
||||
const cached = checkIdempotent(data.commandId);
|
||||
if (cached) {
|
||||
ws.send(JSON.stringify({ type: 'command_result', commandId: data.commandId, ...cached }));
|
||||
return;
|
||||
}
|
||||
// Forward command to bridge
|
||||
pushCommandToBridge(data);
|
||||
if (data.commandId) {
|
||||
recordCommand(data.commandId, { success: true, commandId: data.commandId });
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ Error processing web client message:', error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
console.log('👋 Web client disconnected');
|
||||
webClients.delete(ws);
|
||||
});
|
||||
|
||||
ws.on('error', (error) => {
|
||||
console.error('❌ WebSocket error:', error);
|
||||
});
|
||||
});
|
||||
|
||||
// ========== WebSocket Keep-Alive ==========
|
||||
// Ping all web clients and bridge connections every 25s to keep connections alive
|
||||
const WS_PING_INTERVAL = setInterval(() => {
|
||||
webClients.forEach((ws) => {
|
||||
if (!ws.isAlive) {
|
||||
webClients.delete(ws);
|
||||
return ws.terminate();
|
||||
}
|
||||
ws.isAlive = false;
|
||||
ws.ping();
|
||||
});
|
||||
bridgeClients.forEach((ws) => {
|
||||
if (!ws.isAlive) {
|
||||
console.log('🌉 Bridge connection stale — terminating');
|
||||
bridgeClients.delete(ws);
|
||||
broadcastToClients({ type: 'state_update', bridgeConnected: bridgeClients.size > 0 });
|
||||
return ws.terminate();
|
||||
}
|
||||
ws.isAlive = false;
|
||||
ws.ping();
|
||||
});
|
||||
}, 25000);
|
||||
|
||||
wss.on('close', () => {
|
||||
clearInterval(WS_PING_INTERVAL);
|
||||
});
|
||||
|
||||
// ========== Cross-Project Integration API ==========
|
||||
// These endpoints allow the RemoteTurtle system to query inventory state
|
||||
|
||||
@@ -1341,7 +1245,7 @@ createProxyEndpoint(app, '/api/integration/turtle-status', 'TURTLE_SERVER_URL',
|
||||
setupGracefulShutdown({
|
||||
serviceName: 'inventory-manager',
|
||||
cleanup: [
|
||||
() => wss.close(),
|
||||
() => wsManager.close(),
|
||||
() => server.close(),
|
||||
() => { closeDb(); console.log('💾 Database closed'); },
|
||||
],
|
||||
|
||||
Reference in New Issue
Block a user