From 3e55d77592ee76928487425ec06b148a37fed768 Mon Sep 17 00:00:00 2001 From: MayaTheShy Date: Sun, 22 Mar 2026 11:43:02 -0400 Subject: [PATCH] feat: implement TaskDispatcher for automatic task management and dispatching --- server/TaskDispatcher.js | 387 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 server/TaskDispatcher.js diff --git a/server/TaskDispatcher.js b/server/TaskDispatcher.js new file mode 100644 index 0000000..9d4eab5 --- /dev/null +++ b/server/TaskDispatcher.js @@ -0,0 +1,387 @@ +/** + * TaskDispatcher — Automatic task queue dispatcher for RemoteTurtle + * + * Periodically polls the task_queue for pending tasks, matches them to + * available (idle + connected) turtles, maps task_type to state machine + * states, and drives the turtle state machine. Handles turtle disconnection + * mid-task (re-queues) and completion callbacks. + * + * Task type → State machine mapping: + * mine_area → mining (requires bounds in task_data) + * explore → exploring (requires target/area in task_data) + * gather → extracting (requires blockName in task_data) + * build → building (requires plan in task_data) + * transport → moving (requires target position) + * clear_area → mining (same as mine_area) + * scan → scanning (requires area/range) + * farm → farming (requires area) + * autocraft → autocrafting (requires recipe) + */ + +// Map task_type values from the TaskPanel UI to turtle state machine state names + data mappers +const TASK_TYPE_MAP = { + mine_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) }, + explore: { state: 'exploring', mapData: d => ({ target: d.target || d, ...d }) }, + gather: { state: 'extracting', mapData: d => ({ blockName: d.blockName, count: d.count, ...d }) }, + build: { state: 'building', mapData: d => ({ plan: d.plan, origin: d.origin, ...d }) }, + transport: { state: 'moving', mapData: d => ({ target: d.target || d.destination || d, ...d }) }, + clear_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) }, + scan: { state: 'scanning', mapData: d => ({ area: d.area, ...d }) }, + farm: { state: 'farming', mapData: d => ({ area: d.area, ...d }) }, + autocraft: { state: 'autocrafting', mapData: d => ({ recipe: d.recipe, count: d.count, ...d }) }, +}; + +export class TaskDispatcher { + /** + * @param {Object} opts + * @param {Map} opts.turtles - Live turtle map + * @param {Object} opts.db - Database module (server/database.js) + * @param {Function} opts.broadcastToClients - WebSocket broadcaster + * @param {number} [opts.pollInterval=5000] - ms between dispatch cycles + */ + constructor({ turtles, db, broadcastToClients, pollInterval = 5000 }) { + this._turtles = turtles; + this._db = db; + this._broadcast = broadcastToClients; + this._pollInterval = pollInterval; + this._timer = null; + this._enabled = true; + + // Track which tasks are actively being executed by which turtles + // turtleId -> { taskId, taskType } + this._activeTasks = new Map(); + + // Reverse lookup: taskId -> turtleId + this._taskToTurtle = new Map(); + } + + /** Start the dispatch loop */ + start() { + if (this._timer) return; + console.log(`🚀 TaskDispatcher started (poll every ${this._pollInterval}ms)`); + this._timer = setInterval(() => this._tick(), this._pollInterval); + // Run immediately on start + this._tick(); + } + + /** Stop the dispatch loop */ + stop() { + if (this._timer) { + clearInterval(this._timer); + this._timer = null; + console.log('🛑 TaskDispatcher stopped'); + } + } + + /** Enable/disable automatic dispatching (tasks still tracked when disabled) */ + set enabled(val) { + this._enabled = !!val; + console.log(`TaskDispatcher: auto-dispatch ${this._enabled ? 'enabled' : 'disabled'}`); + } + + get enabled() { + return this._enabled; + } + + /** Get dispatcher status for the API */ + status() { + return { + enabled: this._enabled, + activeTasks: Array.from(this._activeTasks.entries()).map(([turtleId, info]) => ({ + turtleId, + ...info, + })), + pollInterval: this._pollInterval, + }; + } + + // ========== Internal ========== + + /** + * One dispatch cycle: + * 1. Reconcile active tasks (detect turtle disconnects, state completions) + * 2. If enabled, find pending tasks and assign to idle turtles + */ + _tick() { + try { + this._reconcile(); + if (this._enabled) { + this._dispatch(); + } + } catch (err) { + console.error('[TaskDispatcher] tick error:', err.message); + } + } + + /** + * Reconcile: check if turtles executing tasks have finished or disconnected. + */ + _reconcile() { + for (const [turtleId, info] of this._activeTasks) { + const turtle = this._turtles.get(turtleId); + + // Turtle removed from server + if (!turtle) { + this._handleTaskFailure(info.taskId, turtleId, 'Turtle no longer exists'); + continue; + } + + // Turtle disconnected mid-task + if (!turtle.connected) { + this._handleTaskFailure(info.taskId, turtleId, 'Turtle disconnected'); + continue; + } + + // Turtle transitioned to idle → task completed + if (turtle.stateName === 'idle') { + this._handleTaskCompletion(info.taskId, turtleId); + continue; + } + + // Turtle errored out + if (turtle._error) { + this._handleTaskFailure(info.taskId, turtleId, turtle._error); + continue; + } + } + } + + /** + * Dispatch: find pending (unassigned or assigned-but-not-started) tasks, + * match to available turtles, and start them. + */ + _dispatch() { + // Get all pending tasks (sorted by priority DESC, created_at ASC via DB) + let pendingTasks; + try { + pendingTasks = this._db.getAllTasks('pending'); + } catch (e) { + return; + } + + if (!pendingTasks || pendingTasks.length === 0) return; + + // Find idle, connected turtles not already executing a task + const availableTurtles = this._getAvailableTurtles(); + if (availableTurtles.length === 0) return; + + for (const task of pendingTasks) { + if (availableTurtles.length === 0) break; + + const mapping = TASK_TYPE_MAP[task.task_type]; + if (!mapping) { + console.warn(`[TaskDispatcher] Unknown task type: ${task.task_type} (task #${task.id})`); + continue; + } + + // If task has a specific turtle assignment, respect it + let turtle = null; + if (task.assigned_turtle_id) { + turtle = this._turtles.get(task.assigned_turtle_id); + if (!turtle || !turtle.connected || turtle.stateName !== 'idle') { + // Assigned turtle not available — skip this task for now + continue; + } + // Remove from available pool + const idx = availableTurtles.indexOf(turtle); + if (idx !== -1) availableTurtles.splice(idx, 1); + } else { + // Pick the best available turtle (closest to target if we have coords, else first) + turtle = this._pickBestTurtle(availableTurtles, task.task_data); + const idx = availableTurtles.indexOf(turtle); + if (idx !== -1) availableTurtles.splice(idx, 1); + } + + if (!turtle) continue; + + // Dispatch! + this._startTask(task, turtle, mapping); + } + } + + /** + * Start a task on a turtle. + */ + _startTask(task, turtle, mapping) { + const taskId = task.id; + const turtleId = turtle.id; + + console.log(`[TaskDispatcher] Assigning task #${taskId} (${task.task_type}) → Turtle #${turtleId}`); + + // Map task_data to state data + const stateData = mapping.mapData(task.task_data || {}); + + // Update DB: assign + set in_progress + try { + this._db.assignTask(taskId, turtleId); + this._db.updateTaskStatus(taskId, 'in_progress'); + } catch (e) { + console.error(`[TaskDispatcher] DB error assigning task #${taskId}:`, e.message); + return; + } + + // Track + this._activeTasks.set(turtleId, { taskId, taskType: task.task_type }); + this._taskToTurtle.set(taskId, turtleId); + + // Broadcast updates + this._broadcast({ type: 'task_assigned', taskId, turtleId }); + this._broadcast({ type: 'task_updated', taskId, status: 'in_progress' }); + + // Set the turtle's state machine + turtle.setState(mapping.state, stateData); + } + + /** + * Handle task completion (turtle went back to idle after executing). + */ + _handleTaskCompletion(taskId, turtleId) { + console.log(`[TaskDispatcher] Task #${taskId} completed by Turtle #${turtleId}`); + + try { + this._db.completeTask(taskId); + } catch (e) { + console.error(`[TaskDispatcher] DB error completing task #${taskId}:`, e.message); + } + + this._activeTasks.delete(turtleId); + this._taskToTurtle.delete(taskId); + + this._broadcast({ type: 'task_completed', taskId }); + } + + /** + * Handle task failure (turtle disconnected, errored, etc.). + * Re-queues the task as pending so another turtle can pick it up. + */ + _handleTaskFailure(taskId, turtleId, reason) { + console.warn(`[TaskDispatcher] Task #${taskId} failed on Turtle #${turtleId}: ${reason}`); + + try { + // Re-queue: set back to pending, clear assignment + this._db.updateTaskStatus(taskId, 'pending', reason); + this._db.assignTask(taskId, null); + } catch (e) { + console.error(`[TaskDispatcher] DB error re-queuing task #${taskId}:`, e.message); + } + + this._activeTasks.delete(turtleId); + this._taskToTurtle.delete(taskId); + + this._broadcast({ type: 'task_updated', taskId, status: 'pending' }); + } + + /** + * Manually cancel a running task (called via API). + */ + cancelTask(taskId) { + const turtleId = this._taskToTurtle.get(taskId); + if (turtleId !== undefined) { + const turtle = this._turtles.get(turtleId); + if (turtle) { + turtle.setState('idle'); + } + this._activeTasks.delete(turtleId); + this._taskToTurtle.delete(taskId); + } + + try { + this._db.updateTaskStatus(taskId, 'cancelled'); + } catch (e) { + console.error(`[TaskDispatcher] DB error cancelling task #${taskId}:`, e.message); + } + + this._broadcast({ type: 'task_updated', taskId, status: 'cancelled' }); + } + + /** + * Check if a specific task is currently being executed. + */ + isTaskActive(taskId) { + return this._taskToTurtle.has(taskId); + } + + // ========== Helpers ========== + + /** + * Get connected, idle turtles not currently executing a dispatched task. + */ + _getAvailableTurtles() { + const available = []; + for (const [id, turtle] of this._turtles) { + if ( + turtle.connected && + turtle.stateName === 'idle' && + !this._activeTasks.has(id) + ) { + available.push(turtle); + } + } + return available; + } + + /** + * Pick the best turtle for a task. + * If task_data has coordinates, pick the closest turtle with a known position. + * Otherwise, pick the turtle with the most fuel. + */ + _pickBestTurtle(candidates, taskData) { + if (candidates.length === 0) return null; + if (candidates.length === 1) return candidates[0]; + + // Try to extract a target position from task data + const target = this._extractTarget(taskData); + + if (target) { + // Sort by Manhattan distance to target + let bestTurtle = candidates[0]; + let bestDist = Infinity; + + for (const t of candidates) { + if (t.position) { + const dist = Math.abs(t.position.x - target.x) + + Math.abs(t.position.y - target.y) + + Math.abs(t.position.z - target.z); + if (dist < bestDist) { + bestDist = dist; + bestTurtle = t; + } + } + } + return bestTurtle; + } + + // No target — pick turtle with highest fuel + return candidates.reduce((best, t) => (t._fuel > best._fuel ? t : best), candidates[0]); + } + + /** + * Try to extract a target {x,y,z} from task_data. + */ + _extractTarget(data) { + if (!data) return null; + + // Direct target + if (data.target && typeof data.target.x === 'number') return data.target; + if (data.destination && typeof data.destination.x === 'number') return data.destination; + + // Bounds — use center + if (data.bounds) { + const b = data.bounds; + if (typeof b.minX === 'number' && typeof b.maxX === 'number') { + return { + x: Math.floor((b.minX + b.maxX) / 2), + y: Math.floor((b.minY + b.maxY) / 2), + z: Math.floor((b.minZ + b.maxZ) / 2), + }; + } + } + + // Coordinate pair (from TaskPanel) + if (typeof data.startX === 'number' && typeof data.startZ === 'number') { + return { x: data.startX, y: data.startY || 64, z: data.startZ }; + } + + return null; + } +}