fix: implement initial WebSocket sync for reliable command processing
This commit is contained in:
@@ -61,6 +61,7 @@ local running = true
|
|||||||
-- WebSocket state (real-time transport, with HTTP polling fallback)
|
-- WebSocket state (real-time transport, with HTTP polling fallback)
|
||||||
local ws = nil -- active WebSocket handle (nil when not connected)
|
local ws = nil -- active WebSocket handle (nil when not connected)
|
||||||
local wsConnected = false -- gates WS vs HTTP transport selection
|
local wsConnected = false -- gates WS vs HTTP transport selection
|
||||||
|
local wsHasSynced = false -- true after first command_batch received from server
|
||||||
|
|
||||||
-- Reliable modem delivery: pending commands awaiting manager acknowledgment.
|
-- Reliable modem delivery: pending commands awaiting manager acknowledgment.
|
||||||
-- processCommand() inserts here; modemListener removes on result receipt.
|
-- processCommand() inserts here; modemListener removes on result receipt.
|
||||||
@@ -262,13 +263,13 @@ local function stateForwarder()
|
|||||||
end
|
end
|
||||||
|
|
||||||
-- Task 3: Poll web server for commands (HTTP fallback)
|
-- Task 3: Poll web server for commands (HTTP fallback)
|
||||||
-- Skipped when WebSocket is connected (commands arrive via WS push).
|
-- Active when WS is disconnected, or as safety net until first WS sync.
|
||||||
local lastProcessedId = 0 -- track highest processed command ID for dedup
|
local lastProcessedId = 0 -- track highest processed command ID for dedup
|
||||||
|
|
||||||
local function commandPoller()
|
local function commandPoller()
|
||||||
while running do
|
while running do
|
||||||
-- HTTP polling is a fallback; skip when WS is delivering commands
|
-- HTTP polling is a fallback; also runs until WS initial sync completes
|
||||||
if not wsConnected then
|
if not wsConnected or not wsHasSynced then
|
||||||
local ok, err = pcall(function()
|
local ok, err = pcall(function()
|
||||||
local result = httpGet("/api/bridge/commands")
|
local result = httpGet("/api/bridge/commands")
|
||||||
if result and result.commands and #result.commands > 0 then
|
if result and result.commands and #result.commands > 0 then
|
||||||
@@ -341,8 +342,21 @@ local function wsConnector()
|
|||||||
end,
|
end,
|
||||||
|
|
||||||
onMessage = function(wsHandle, data)
|
onMessage = function(wsHandle, data)
|
||||||
|
-- Initial sync: server sends pending commands as a batch on connect
|
||||||
|
if data.type == 'command_batch' and data.commands then
|
||||||
|
wsHasSynced = true
|
||||||
|
for _, cmd in ipairs(data.commands) do
|
||||||
|
local cmdOk, cmdErr = pcall(processCommand, cmd)
|
||||||
|
if not cmdOk then
|
||||||
|
print(string.format("[ERR] WS batch cmd %s: %s",
|
||||||
|
tostring(cmd.action), tostring(cmdErr)))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if #data.commands > 0 then
|
||||||
|
print(string.format("[WS] Processed %d synced command(s)", #data.commands))
|
||||||
|
end
|
||||||
-- Server pushes commands via WebSocket (replaces HTTP polling)
|
-- Server pushes commands via WebSocket (replaces HTTP polling)
|
||||||
if data.action then
|
elseif data.action then
|
||||||
local cmdOk, cmdErr = pcall(processCommand, data)
|
local cmdOk, cmdErr = pcall(processCommand, data)
|
||||||
if not cmdOk then
|
if not cmdOk then
|
||||||
print(string.format("[ERR] WS cmd %s: %s",
|
print(string.format("[ERR] WS cmd %s: %s",
|
||||||
@@ -354,6 +368,7 @@ local function wsConnector()
|
|||||||
onDisconnect = function()
|
onDisconnect = function()
|
||||||
ws = nil
|
ws = nil
|
||||||
wsConnected = false
|
wsConnected = false
|
||||||
|
wsHasSynced = false
|
||||||
print("[WS] Disconnected — HTTP polling fallback active")
|
print("[WS] Disconnected — HTTP polling fallback active")
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user