/** * TaskDispatcher — Automatic task queue dispatcher for RemoteTurtle * * Periodically polls the task_queue for pending tasks, matches them to * available (idle + connected) turtles, maps task_type to state machine * states, and drives the turtle state machine. Handles turtle disconnection * mid-task (re-queues) and completion callbacks. * * Task type → State machine mapping: * mine_area → mining (requires bounds in task_data) * explore → exploring (requires target/area in task_data) * gather → extracting (requires blockName in task_data) * build → building (requires plan in task_data) * transport → moving (requires target position) * clear_area → mining (same as mine_area) * scan → scanning (requires area/range) * farm → farming (requires area) * autocraft → autocrafting (requires recipe) */ // Map task_type values from the TaskPanel UI to turtle state machine state names + data mappers const TASK_TYPE_MAP = { mine_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) }, explore: { state: 'exploring', mapData: d => ({ target: d.target || d, ...d }) }, gather: { state: 'extracting', mapData: d => ({ blockName: d.blockName, count: d.count, ...d }) }, build: { state: 'building', mapData: d => ({ plan: d.plan, origin: d.origin, ...d }) }, transport: { state: 'moving', mapData: d => ({ target: d.target || d.destination || d, ...d }) }, clear_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) }, scan: { state: 'scanning', mapData: d => ({ area: d.area, ...d }) }, farm: { state: 'farming', mapData: d => ({ area: d.area, ...d }) }, autocraft: { state: 'autocrafting', mapData: d => ({ recipe: d.recipe, count: d.count, ...d }) }, }; export class TaskDispatcher { /** * @param {Object} opts * @param {Map} opts.turtles - Live turtle map * @param {Object} opts.db - Database module (server/database.js) * @param {Function} opts.broadcastToClients - WebSocket broadcaster * @param {number} [opts.pollInterval=5000] - ms between dispatch cycles */ constructor({ turtles, db, broadcastToClients, pollInterval = 5000 }) { this._turtles = turtles; this._db = db; this._broadcast = broadcastToClients; this._pollInterval = pollInterval; this._timer = null; this._enabled = true; // Track which tasks are actively being executed by which turtles // turtleId -> { taskId, taskType } this._activeTasks = new Map(); // Reverse lookup: taskId -> turtleId this._taskToTurtle = new Map(); } /** Start the dispatch loop */ start() { if (this._timer) return; console.log(`🚀 TaskDispatcher started (poll every ${this._pollInterval}ms)`); this._timer = setInterval(() => this._tick(), this._pollInterval); // Run immediately on start this._tick(); } /** Stop the dispatch loop */ stop() { if (this._timer) { clearInterval(this._timer); this._timer = null; console.log('🛑 TaskDispatcher stopped'); } } /** Enable/disable automatic dispatching (tasks still tracked when disabled) */ set enabled(val) { this._enabled = !!val; console.log(`TaskDispatcher: auto-dispatch ${this._enabled ? 'enabled' : 'disabled'}`); } get enabled() { return this._enabled; } /** Get dispatcher status for the API */ status() { return { enabled: this._enabled, activeTasks: Array.from(this._activeTasks.entries()).map(([turtleId, info]) => ({ turtleId, ...info, })), pollInterval: this._pollInterval, }; } // ========== Internal ========== /** * One dispatch cycle: * 1. Reconcile active tasks (detect turtle disconnects, state completions) * 2. If enabled, find pending tasks and assign to idle turtles */ _tick() { try { this._reconcile(); if (this._enabled) { this._dispatch(); } } catch (err) { console.error('[TaskDispatcher] tick error:', err.message); } } /** * Reconcile: check if turtles executing tasks have finished or disconnected. */ _reconcile() { for (const [turtleId, info] of this._activeTasks) { const turtle = this._turtles.get(turtleId); // Turtle removed from server if (!turtle) { this._handleTaskFailure(info.taskId, turtleId, 'Turtle no longer exists'); continue; } // Turtle disconnected mid-task if (!turtle.connected) { this._handleTaskFailure(info.taskId, turtleId, 'Turtle disconnected'); continue; } // Turtle transitioned to idle → task completed if (turtle.stateName === 'idle') { this._handleTaskCompletion(info.taskId, turtleId); continue; } // Turtle errored out if (turtle._error) { this._handleTaskFailure(info.taskId, turtleId, turtle._error); continue; } } } /** * Dispatch: find pending (unassigned or assigned-but-not-started) tasks, * match to available turtles, and start them. */ _dispatch() { // Get all pending tasks (sorted by priority DESC, created_at ASC via DB) let pendingTasks; try { pendingTasks = this._db.getAllTasks('pending'); } catch (e) { return; } if (!pendingTasks || pendingTasks.length === 0) return; // Find idle, connected turtles not already executing a task const availableTurtles = this._getAvailableTurtles(); if (availableTurtles.length === 0) return; for (const task of pendingTasks) { if (availableTurtles.length === 0) break; const mapping = TASK_TYPE_MAP[task.task_type]; if (!mapping) { console.warn(`[TaskDispatcher] Unknown task type: ${task.task_type} (task #${task.id})`); continue; } // If task has a specific turtle assignment, respect it let turtle = null; if (task.assigned_turtle_id) { turtle = this._turtles.get(task.assigned_turtle_id); if (!turtle || !turtle.connected || turtle.stateName !== 'idle') { // Assigned turtle not available — skip this task for now continue; } // Remove from available pool const idx = availableTurtles.indexOf(turtle); if (idx !== -1) availableTurtles.splice(idx, 1); } else { // Pick the best available turtle (closest to target if we have coords, else first) turtle = this._pickBestTurtle(availableTurtles, task.task_data); const idx = availableTurtles.indexOf(turtle); if (idx !== -1) availableTurtles.splice(idx, 1); } if (!turtle) continue; // Dispatch! this._startTask(task, turtle, mapping); } } /** * Start a task on a turtle. */ _startTask(task, turtle, mapping) { const taskId = task.id; const turtleId = turtle.id; console.log(`[TaskDispatcher] Assigning task #${taskId} (${task.task_type}) → Turtle #${turtleId}`); // Map task_data to state data const stateData = mapping.mapData(task.task_data || {}); // Update DB: assign + set in_progress try { this._db.assignTask(taskId, turtleId); this._db.updateTaskStatus(taskId, 'in_progress'); } catch (e) { console.error(`[TaskDispatcher] DB error assigning task #${taskId}:`, e.message); return; } // Track this._activeTasks.set(turtleId, { taskId, taskType: task.task_type }); this._taskToTurtle.set(taskId, turtleId); // Broadcast updates this._broadcast({ type: 'task_assigned', taskId, turtleId }); this._broadcast({ type: 'task_updated', taskId, status: 'in_progress' }); // Set the turtle's state machine turtle.setState(mapping.state, stateData); } /** * Handle task completion (turtle went back to idle after executing). */ _handleTaskCompletion(taskId, turtleId) { console.log(`[TaskDispatcher] Task #${taskId} completed by Turtle #${turtleId}`); try { this._db.completeTask(taskId); } catch (e) { console.error(`[TaskDispatcher] DB error completing task #${taskId}:`, e.message); } this._activeTasks.delete(turtleId); this._taskToTurtle.delete(taskId); this._broadcast({ type: 'task_completed', taskId }); } /** * Handle task failure (turtle disconnected, errored, etc.). * Re-queues the task as pending so another turtle can pick it up. */ _handleTaskFailure(taskId, turtleId, reason) { console.warn(`[TaskDispatcher] Task #${taskId} failed on Turtle #${turtleId}: ${reason}`); try { // Re-queue: set back to pending, clear assignment this._db.updateTaskStatus(taskId, 'pending', reason); this._db.assignTask(taskId, null); } catch (e) { console.error(`[TaskDispatcher] DB error re-queuing task #${taskId}:`, e.message); } this._activeTasks.delete(turtleId); this._taskToTurtle.delete(taskId); this._broadcast({ type: 'task_updated', taskId, status: 'pending' }); } /** * Manually cancel a running task (called via API). */ cancelTask(taskId) { const turtleId = this._taskToTurtle.get(taskId); if (turtleId !== undefined) { const turtle = this._turtles.get(turtleId); if (turtle) { turtle.setState('idle'); } this._activeTasks.delete(turtleId); this._taskToTurtle.delete(taskId); } try { this._db.updateTaskStatus(taskId, 'cancelled'); } catch (e) { console.error(`[TaskDispatcher] DB error cancelling task #${taskId}:`, e.message); } this._broadcast({ type: 'task_updated', taskId, status: 'cancelled' }); } /** * Check if a specific task is currently being executed. */ isTaskActive(taskId) { return this._taskToTurtle.has(taskId); } // ========== Helpers ========== /** * Get connected, idle turtles not currently executing a dispatched task. */ _getAvailableTurtles() { const available = []; for (const [id, turtle] of this._turtles) { if ( turtle.connected && turtle.stateName === 'idle' && !this._activeTasks.has(id) ) { available.push(turtle); } } return available; } /** * Pick the best turtle for a task. * If task_data has coordinates, pick the closest turtle with a known position. * Otherwise, pick the turtle with the most fuel. */ _pickBestTurtle(candidates, taskData) { if (candidates.length === 0) return null; if (candidates.length === 1) return candidates[0]; // Try to extract a target position from task data const target = this._extractTarget(taskData); if (target) { // Sort by Manhattan distance to target let bestTurtle = candidates[0]; let bestDist = Infinity; for (const t of candidates) { if (t.position) { const dist = Math.abs(t.position.x - target.x) + Math.abs(t.position.y - target.y) + Math.abs(t.position.z - target.z); if (dist < bestDist) { bestDist = dist; bestTurtle = t; } } } return bestTurtle; } // No target — pick turtle with highest fuel return candidates.reduce((best, t) => (t._fuel > best._fuel ? t : best), candidates[0]); } /** * Try to extract a target {x,y,z} from task_data. */ _extractTarget(data) { if (!data) return null; // Direct target if (data.target && typeof data.target.x === 'number') return data.target; if (data.destination && typeof data.destination.x === 'number') return data.destination; // Bounds — use center if (data.bounds) { const b = data.bounds; if (typeof b.minX === 'number' && typeof b.maxX === 'number') { return { x: Math.floor((b.minX + b.maxX) / 2), y: Math.floor((b.minY + b.maxY) / 2), z: Math.floor((b.minZ + b.maxZ) / 2), }; } } // Coordinate pair (from TaskPanel) if (typeof data.startX === 'number' && typeof data.startZ === 'number') { return { x: data.startX, y: data.startY || 64, z: data.startZ }; } return null; } }