feat: implement WebSocket support for real-time command handling and state updates
This commit is contained in:
@@ -4,8 +4,17 @@
|
|||||||
-- forwards state to the web server via HTTP.
|
-- forwards state to the web server via HTTP.
|
||||||
-- Also polls the web server for commands and sends them to the master.
|
-- Also polls the web server for commands and sends them to the master.
|
||||||
--
|
--
|
||||||
-- Uses cc-platform-core for shared infrastructure (config, HTTP, modem).
|
-- Uses cc-platform-core for shared infrastructure (config, HTTP, modem, WS).
|
||||||
-- Service-specific logic (command dispatch, state forwarding) remains here.
|
-- Service-specific logic (command dispatch, state forwarding) remains here.
|
||||||
|
--
|
||||||
|
-- Transport: WebSocket (primary) with HTTP polling (fallback).
|
||||||
|
-- When a WS connection to /ws/bridge is active, commands arrive in
|
||||||
|
-- real-time and state/results are pushed over the socket. If the WS
|
||||||
|
-- drops, the bridge seamlessly falls back to HTTP polling until
|
||||||
|
-- reconnection.
|
||||||
|
--
|
||||||
|
-- Channel mode: 'current' by default (legacy channels active).
|
||||||
|
-- Set channelMode = 'dual' or 'target' in platform config to migrate.
|
||||||
|
|
||||||
local WebBridge = require('platform.webbridge')
|
local WebBridge = require('platform.webbridge')
|
||||||
local Channels = require('platform.channels')
|
local Channels = require('platform.channels')
|
||||||
@@ -49,6 +58,10 @@ local modem = nil
|
|||||||
local modemName = nil
|
local modemName = nil
|
||||||
local running = true
|
local running = true
|
||||||
|
|
||||||
|
-- WebSocket state (real-time transport, with HTTP polling fallback)
|
||||||
|
local ws = nil -- active WebSocket handle (nil when not connected)
|
||||||
|
local wsConnected = false -- gates WS vs HTTP transport selection
|
||||||
|
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
-- HTTP helpers (thin wrappers around platform)
|
-- HTTP helpers (thin wrappers around platform)
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
@@ -83,6 +96,32 @@ local function forwardState()
|
|||||||
httpPost("/api/bridge/state", latestState)
|
httpPost("/api/bridge/state", latestState)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-------------------------------------------------
|
||||||
|
-- WebSocket helpers (real-time transport)
|
||||||
|
-------------------------------------------------
|
||||||
|
|
||||||
|
--- Build the WebSocket bridge URL from server config.
|
||||||
|
-- Converts http(s):// to ws(s):// and appends /ws/bridge path.
|
||||||
|
-- @return string WebSocket URL with optional API key
|
||||||
|
local function getWsUrl()
|
||||||
|
local wsUrl = SERVER_URL:gsub("^http", "ws") .. "/ws/bridge"
|
||||||
|
if API_KEY then
|
||||||
|
wsUrl = wsUrl .. "?key=" .. textutils.urlEncode(API_KEY)
|
||||||
|
end
|
||||||
|
return wsUrl
|
||||||
|
end
|
||||||
|
|
||||||
|
--- Send a JSON message via WebSocket if connected.
|
||||||
|
-- @param data table Data to send (serialized to JSON automatically)
|
||||||
|
-- @return boolean true if sent successfully, false if WS unavailable
|
||||||
|
local function wsSend(data)
|
||||||
|
if ws and wsConnected then
|
||||||
|
local ok = pcall(ws.send, textutils.serialiseJSON(data))
|
||||||
|
return ok
|
||||||
|
end
|
||||||
|
return false
|
||||||
|
end
|
||||||
|
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
-- Process commands from web server
|
-- Process commands from web server
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
@@ -205,15 +244,28 @@ local function modemListener()
|
|||||||
local resultType = message.type
|
local resultType = message.type
|
||||||
if resultType == "order_result" or resultType == "craft_result"
|
if resultType == "order_result" or resultType == "craft_result"
|
||||||
or resultType == "recursive_craft_result" or resultType == "find_item_result" then
|
or resultType == "recursive_craft_result" or resultType == "find_item_result" then
|
||||||
local fwdOk, fwdErr = pcall(httpPost, "/api/bridge/result", {
|
local resultPayload = {
|
||||||
action = resultType,
|
action = resultType,
|
||||||
commandId = message.commandId,
|
commandId = message.commandId,
|
||||||
success = message.success,
|
success = message.success,
|
||||||
message = message.message,
|
message = message.message,
|
||||||
error = message.error,
|
error = message.error,
|
||||||
|
}
|
||||||
|
-- WS-first: send as command_result via WebSocket if connected
|
||||||
|
local sent = wsSend({
|
||||||
|
type = "command_result",
|
||||||
|
action = resultPayload.action,
|
||||||
|
commandId = resultPayload.commandId,
|
||||||
|
success = resultPayload.success,
|
||||||
|
message = resultPayload.message,
|
||||||
|
error = resultPayload.error,
|
||||||
})
|
})
|
||||||
if not fwdOk then
|
-- HTTP fallback: POST to /api/bridge/result if WS unavailable
|
||||||
print(string.format("[ERR] Forward result %s: %s", resultType, tostring(fwdErr)))
|
if not sent then
|
||||||
|
local fwdOk, fwdErr = pcall(httpPost, "/api/bridge/result", resultPayload)
|
||||||
|
if not fwdOk then
|
||||||
|
print(string.format("[ERR] Forward result %s: %s", resultType, tostring(fwdErr)))
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -221,45 +273,56 @@ local function modemListener()
|
|||||||
end
|
end
|
||||||
|
|
||||||
-- Task 2: Forward state to web server periodically
|
-- Task 2: Forward state to web server periodically
|
||||||
|
-- Uses WebSocket if connected; falls back to HTTP POST.
|
||||||
local function stateForwarder()
|
local function stateForwarder()
|
||||||
while running do
|
while running do
|
||||||
local ok, err = pcall(forwardState)
|
if latestState then
|
||||||
if not ok then
|
-- WS-first: send state directly via WebSocket
|
||||||
print(string.format("[ERR] State forward: %s", tostring(err)))
|
if not wsSend(latestState) then
|
||||||
|
-- HTTP fallback: POST to /api/bridge/state
|
||||||
|
local ok, err = pcall(forwardState)
|
||||||
|
if not ok then
|
||||||
|
print(string.format("[ERR] State forward: %s", tostring(err)))
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
sleep(STATE_INTERVAL)
|
sleep(STATE_INTERVAL)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Task 3: Poll web server for commands
|
-- Task 3: Poll web server for commands (HTTP fallback)
|
||||||
|
-- Skipped when WebSocket is connected (commands arrive via WS push).
|
||||||
local lastProcessedId = 0 -- track highest processed command ID for dedup
|
local lastProcessedId = 0 -- track highest processed command ID for dedup
|
||||||
|
|
||||||
local function commandPoller()
|
local function commandPoller()
|
||||||
while running do
|
while running do
|
||||||
local ok, err = pcall(function()
|
-- HTTP polling is a fallback; skip when WS is delivering commands
|
||||||
local result = httpGet("/api/bridge/commands")
|
if not wsConnected then
|
||||||
if result and result.commands and #result.commands > 0 then
|
local ok, err = pcall(function()
|
||||||
local maxId = lastProcessedId
|
local result = httpGet("/api/bridge/commands")
|
||||||
-- Process each command, skipping already-processed ones
|
if result and result.commands and #result.commands > 0 then
|
||||||
for _, cmd in ipairs(result.commands) do
|
local maxId = lastProcessedId
|
||||||
local cmdId = cmd.id or 0
|
-- Process each command, skipping already-processed ones
|
||||||
if cmdId > lastProcessedId then
|
for _, cmd in ipairs(result.commands) do
|
||||||
local cmdOk, cmdErr = pcall(processCommand, cmd)
|
local cmdId = cmd.id or 0
|
||||||
if not cmdOk then
|
if cmdId > lastProcessedId then
|
||||||
print(string.format("[ERR] Process cmd %s: %s", tostring(cmd.action), tostring(cmdErr)))
|
local cmdOk, cmdErr = pcall(processCommand, cmd)
|
||||||
|
if not cmdOk then
|
||||||
|
print(string.format("[ERR] Process cmd %s: %s", tostring(cmd.action), tostring(cmdErr)))
|
||||||
|
end
|
||||||
|
if cmdId > maxId then maxId = cmdId end
|
||||||
end
|
end
|
||||||
if cmdId > maxId then maxId = cmdId end
|
end
|
||||||
|
-- Acknowledge up to the highest processed ID
|
||||||
|
if maxId > lastProcessedId then
|
||||||
|
lastProcessedId = maxId
|
||||||
|
httpPost("/api/bridge/commands/ack", { lastProcessedId = lastProcessedId })
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
-- Acknowledge up to the highest processed ID
|
end)
|
||||||
if maxId > lastProcessedId then
|
if not ok then
|
||||||
lastProcessedId = maxId
|
print(string.format("[ERR] Command poll: %s", tostring(err)))
|
||||||
httpPost("/api/bridge/commands/ack", { lastProcessedId = lastProcessedId })
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end)
|
|
||||||
if not ok then
|
|
||||||
print(string.format("[ERR] Command poll: %s", tostring(err)))
|
|
||||||
end
|
end
|
||||||
sleep(POLL_INTERVAL)
|
sleep(POLL_INTERVAL)
|
||||||
end
|
end
|
||||||
@@ -285,6 +348,53 @@ local function heartbeat()
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- Task 5: WebSocket real-time connection (primary transport)
|
||||||
|
-- Maintains a persistent WebSocket link to the server for:
|
||||||
|
-- - Receiving commands in real-time (replaces HTTP polling when active)
|
||||||
|
-- - Sending state updates and command results via wsSend()
|
||||||
|
-- Reconnects automatically on failure; HTTP polling resumes as fallback.
|
||||||
|
-- Channel mode: 'current' by default — dual/target configurable via platform.
|
||||||
|
local function wsConnector()
|
||||||
|
local wsUrl = getWsUrl()
|
||||||
|
print("[WS] Connecting to " .. wsUrl)
|
||||||
|
|
||||||
|
WebBridge.wsConnect(wsUrl, {
|
||||||
|
onConnect = function(wsHandle)
|
||||||
|
ws = wsHandle
|
||||||
|
wsConnected = true
|
||||||
|
print("[WS] Connected — real-time mode active")
|
||||||
|
-- Push current state immediately on reconnect
|
||||||
|
if latestState then
|
||||||
|
wsSend(latestState)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
onMessage = function(wsHandle, data)
|
||||||
|
-- Server pushes commands via WebSocket (replaces HTTP polling)
|
||||||
|
if data.action then
|
||||||
|
local cmdOk, cmdErr = pcall(processCommand, data)
|
||||||
|
if not cmdOk then
|
||||||
|
print(string.format("[ERR] WS cmd %s: %s",
|
||||||
|
tostring(data.action), tostring(cmdErr)))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
onDisconnect = function()
|
||||||
|
ws = nil
|
||||||
|
wsConnected = false
|
||||||
|
print("[WS] Disconnected — HTTP polling fallback active")
|
||||||
|
end,
|
||||||
|
|
||||||
|
onError = function(err)
|
||||||
|
print(string.format("[WS] Connection error: %s", tostring(err)))
|
||||||
|
end,
|
||||||
|
}, {
|
||||||
|
reconnectDelay = 5,
|
||||||
|
receiveTimeout = 30,
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
-- Main
|
-- Main
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
@@ -316,6 +426,7 @@ local function main()
|
|||||||
else
|
else
|
||||||
print("[WARN] No API key set (open access)")
|
print("[WARN] No API key set (open access)")
|
||||||
end
|
end
|
||||||
|
print("[OK] Transport: WebSocket (primary) + HTTP polling (fallback)")
|
||||||
print("")
|
print("")
|
||||||
print("Bridge is running. Press Ctrl+T to stop.")
|
print("Bridge is running. Press Ctrl+T to stop.")
|
||||||
print("Listening for master broadcasts on ch " .. BROADCAST_CHANNEL)
|
print("Listening for master broadcasts on ch " .. BROADCAST_CHANNEL)
|
||||||
@@ -325,7 +436,8 @@ local function main()
|
|||||||
modemListener,
|
modemListener,
|
||||||
stateForwarder,
|
stateForwarder,
|
||||||
commandPoller,
|
commandPoller,
|
||||||
heartbeat
|
heartbeat,
|
||||||
|
wsConnector
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user