feat: implement TaskDispatcher for automatic task management and dispatching
This commit is contained in:
387
server/TaskDispatcher.js
Normal file
387
server/TaskDispatcher.js
Normal file
@@ -0,0 +1,387 @@
|
||||
/**
|
||||
* TaskDispatcher — Automatic task queue dispatcher for RemoteTurtle
|
||||
*
|
||||
* Periodically polls the task_queue for pending tasks, matches them to
|
||||
* available (idle + connected) turtles, maps task_type to state machine
|
||||
* states, and drives the turtle state machine. Handles turtle disconnection
|
||||
* mid-task (re-queues) and completion callbacks.
|
||||
*
|
||||
* Task type → State machine mapping:
|
||||
* mine_area → mining (requires bounds in task_data)
|
||||
* explore → exploring (requires target/area in task_data)
|
||||
* gather → extracting (requires blockName in task_data)
|
||||
* build → building (requires plan in task_data)
|
||||
* transport → moving (requires target position)
|
||||
* clear_area → mining (same as mine_area)
|
||||
* scan → scanning (requires area/range)
|
||||
* farm → farming (requires area)
|
||||
* autocraft → autocrafting (requires recipe)
|
||||
*/
|
||||
|
||||
// Map task_type values from the TaskPanel UI to turtle state machine state names + data mappers
|
||||
const TASK_TYPE_MAP = {
|
||||
mine_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) },
|
||||
explore: { state: 'exploring', mapData: d => ({ target: d.target || d, ...d }) },
|
||||
gather: { state: 'extracting', mapData: d => ({ blockName: d.blockName, count: d.count, ...d }) },
|
||||
build: { state: 'building', mapData: d => ({ plan: d.plan, origin: d.origin, ...d }) },
|
||||
transport: { state: 'moving', mapData: d => ({ target: d.target || d.destination || d, ...d }) },
|
||||
clear_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) },
|
||||
scan: { state: 'scanning', mapData: d => ({ area: d.area, ...d }) },
|
||||
farm: { state: 'farming', mapData: d => ({ area: d.area, ...d }) },
|
||||
autocraft: { state: 'autocrafting', mapData: d => ({ recipe: d.recipe, count: d.count, ...d }) },
|
||||
};
|
||||
|
||||
export class TaskDispatcher {
|
||||
/**
|
||||
* @param {Object} opts
|
||||
* @param {Map<number, import('./Turtle.js').Turtle>} opts.turtles - Live turtle map
|
||||
* @param {Object} opts.db - Database module (server/database.js)
|
||||
* @param {Function} opts.broadcastToClients - WebSocket broadcaster
|
||||
* @param {number} [opts.pollInterval=5000] - ms between dispatch cycles
|
||||
*/
|
||||
constructor({ turtles, db, broadcastToClients, pollInterval = 5000 }) {
|
||||
this._turtles = turtles;
|
||||
this._db = db;
|
||||
this._broadcast = broadcastToClients;
|
||||
this._pollInterval = pollInterval;
|
||||
this._timer = null;
|
||||
this._enabled = true;
|
||||
|
||||
// Track which tasks are actively being executed by which turtles
|
||||
// turtleId -> { taskId, taskType }
|
||||
this._activeTasks = new Map();
|
||||
|
||||
// Reverse lookup: taskId -> turtleId
|
||||
this._taskToTurtle = new Map();
|
||||
}
|
||||
|
||||
/** Start the dispatch loop */
|
||||
start() {
|
||||
if (this._timer) return;
|
||||
console.log(`🚀 TaskDispatcher started (poll every ${this._pollInterval}ms)`);
|
||||
this._timer = setInterval(() => this._tick(), this._pollInterval);
|
||||
// Run immediately on start
|
||||
this._tick();
|
||||
}
|
||||
|
||||
/** Stop the dispatch loop */
|
||||
stop() {
|
||||
if (this._timer) {
|
||||
clearInterval(this._timer);
|
||||
this._timer = null;
|
||||
console.log('🛑 TaskDispatcher stopped');
|
||||
}
|
||||
}
|
||||
|
||||
/** Enable/disable automatic dispatching (tasks still tracked when disabled) */
|
||||
set enabled(val) {
|
||||
this._enabled = !!val;
|
||||
console.log(`TaskDispatcher: auto-dispatch ${this._enabled ? 'enabled' : 'disabled'}`);
|
||||
}
|
||||
|
||||
get enabled() {
|
||||
return this._enabled;
|
||||
}
|
||||
|
||||
/** Get dispatcher status for the API */
|
||||
status() {
|
||||
return {
|
||||
enabled: this._enabled,
|
||||
activeTasks: Array.from(this._activeTasks.entries()).map(([turtleId, info]) => ({
|
||||
turtleId,
|
||||
...info,
|
||||
})),
|
||||
pollInterval: this._pollInterval,
|
||||
};
|
||||
}
|
||||
|
||||
// ========== Internal ==========
|
||||
|
||||
/**
|
||||
* One dispatch cycle:
|
||||
* 1. Reconcile active tasks (detect turtle disconnects, state completions)
|
||||
* 2. If enabled, find pending tasks and assign to idle turtles
|
||||
*/
|
||||
_tick() {
|
||||
try {
|
||||
this._reconcile();
|
||||
if (this._enabled) {
|
||||
this._dispatch();
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[TaskDispatcher] tick error:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile: check if turtles executing tasks have finished or disconnected.
|
||||
*/
|
||||
_reconcile() {
|
||||
for (const [turtleId, info] of this._activeTasks) {
|
||||
const turtle = this._turtles.get(turtleId);
|
||||
|
||||
// Turtle removed from server
|
||||
if (!turtle) {
|
||||
this._handleTaskFailure(info.taskId, turtleId, 'Turtle no longer exists');
|
||||
continue;
|
||||
}
|
||||
|
||||
// Turtle disconnected mid-task
|
||||
if (!turtle.connected) {
|
||||
this._handleTaskFailure(info.taskId, turtleId, 'Turtle disconnected');
|
||||
continue;
|
||||
}
|
||||
|
||||
// Turtle transitioned to idle → task completed
|
||||
if (turtle.stateName === 'idle') {
|
||||
this._handleTaskCompletion(info.taskId, turtleId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Turtle errored out
|
||||
if (turtle._error) {
|
||||
this._handleTaskFailure(info.taskId, turtleId, turtle._error);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch: find pending (unassigned or assigned-but-not-started) tasks,
|
||||
* match to available turtles, and start them.
|
||||
*/
|
||||
_dispatch() {
|
||||
// Get all pending tasks (sorted by priority DESC, created_at ASC via DB)
|
||||
let pendingTasks;
|
||||
try {
|
||||
pendingTasks = this._db.getAllTasks('pending');
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pendingTasks || pendingTasks.length === 0) return;
|
||||
|
||||
// Find idle, connected turtles not already executing a task
|
||||
const availableTurtles = this._getAvailableTurtles();
|
||||
if (availableTurtles.length === 0) return;
|
||||
|
||||
for (const task of pendingTasks) {
|
||||
if (availableTurtles.length === 0) break;
|
||||
|
||||
const mapping = TASK_TYPE_MAP[task.task_type];
|
||||
if (!mapping) {
|
||||
console.warn(`[TaskDispatcher] Unknown task type: ${task.task_type} (task #${task.id})`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If task has a specific turtle assignment, respect it
|
||||
let turtle = null;
|
||||
if (task.assigned_turtle_id) {
|
||||
turtle = this._turtles.get(task.assigned_turtle_id);
|
||||
if (!turtle || !turtle.connected || turtle.stateName !== 'idle') {
|
||||
// Assigned turtle not available — skip this task for now
|
||||
continue;
|
||||
}
|
||||
// Remove from available pool
|
||||
const idx = availableTurtles.indexOf(turtle);
|
||||
if (idx !== -1) availableTurtles.splice(idx, 1);
|
||||
} else {
|
||||
// Pick the best available turtle (closest to target if we have coords, else first)
|
||||
turtle = this._pickBestTurtle(availableTurtles, task.task_data);
|
||||
const idx = availableTurtles.indexOf(turtle);
|
||||
if (idx !== -1) availableTurtles.splice(idx, 1);
|
||||
}
|
||||
|
||||
if (!turtle) continue;
|
||||
|
||||
// Dispatch!
|
||||
this._startTask(task, turtle, mapping);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a task on a turtle.
|
||||
*/
|
||||
_startTask(task, turtle, mapping) {
|
||||
const taskId = task.id;
|
||||
const turtleId = turtle.id;
|
||||
|
||||
console.log(`[TaskDispatcher] Assigning task #${taskId} (${task.task_type}) → Turtle #${turtleId}`);
|
||||
|
||||
// Map task_data to state data
|
||||
const stateData = mapping.mapData(task.task_data || {});
|
||||
|
||||
// Update DB: assign + set in_progress
|
||||
try {
|
||||
this._db.assignTask(taskId, turtleId);
|
||||
this._db.updateTaskStatus(taskId, 'in_progress');
|
||||
} catch (e) {
|
||||
console.error(`[TaskDispatcher] DB error assigning task #${taskId}:`, e.message);
|
||||
return;
|
||||
}
|
||||
|
||||
// Track
|
||||
this._activeTasks.set(turtleId, { taskId, taskType: task.task_type });
|
||||
this._taskToTurtle.set(taskId, turtleId);
|
||||
|
||||
// Broadcast updates
|
||||
this._broadcast({ type: 'task_assigned', taskId, turtleId });
|
||||
this._broadcast({ type: 'task_updated', taskId, status: 'in_progress' });
|
||||
|
||||
// Set the turtle's state machine
|
||||
turtle.setState(mapping.state, stateData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle task completion (turtle went back to idle after executing).
|
||||
*/
|
||||
_handleTaskCompletion(taskId, turtleId) {
|
||||
console.log(`[TaskDispatcher] Task #${taskId} completed by Turtle #${turtleId}`);
|
||||
|
||||
try {
|
||||
this._db.completeTask(taskId);
|
||||
} catch (e) {
|
||||
console.error(`[TaskDispatcher] DB error completing task #${taskId}:`, e.message);
|
||||
}
|
||||
|
||||
this._activeTasks.delete(turtleId);
|
||||
this._taskToTurtle.delete(taskId);
|
||||
|
||||
this._broadcast({ type: 'task_completed', taskId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle task failure (turtle disconnected, errored, etc.).
|
||||
* Re-queues the task as pending so another turtle can pick it up.
|
||||
*/
|
||||
_handleTaskFailure(taskId, turtleId, reason) {
|
||||
console.warn(`[TaskDispatcher] Task #${taskId} failed on Turtle #${turtleId}: ${reason}`);
|
||||
|
||||
try {
|
||||
// Re-queue: set back to pending, clear assignment
|
||||
this._db.updateTaskStatus(taskId, 'pending', reason);
|
||||
this._db.assignTask(taskId, null);
|
||||
} catch (e) {
|
||||
console.error(`[TaskDispatcher] DB error re-queuing task #${taskId}:`, e.message);
|
||||
}
|
||||
|
||||
this._activeTasks.delete(turtleId);
|
||||
this._taskToTurtle.delete(taskId);
|
||||
|
||||
this._broadcast({ type: 'task_updated', taskId, status: 'pending' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually cancel a running task (called via API).
|
||||
*/
|
||||
cancelTask(taskId) {
|
||||
const turtleId = this._taskToTurtle.get(taskId);
|
||||
if (turtleId !== undefined) {
|
||||
const turtle = this._turtles.get(turtleId);
|
||||
if (turtle) {
|
||||
turtle.setState('idle');
|
||||
}
|
||||
this._activeTasks.delete(turtleId);
|
||||
this._taskToTurtle.delete(taskId);
|
||||
}
|
||||
|
||||
try {
|
||||
this._db.updateTaskStatus(taskId, 'cancelled');
|
||||
} catch (e) {
|
||||
console.error(`[TaskDispatcher] DB error cancelling task #${taskId}:`, e.message);
|
||||
}
|
||||
|
||||
this._broadcast({ type: 'task_updated', taskId, status: 'cancelled' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a specific task is currently being executed.
|
||||
*/
|
||||
isTaskActive(taskId) {
|
||||
return this._taskToTurtle.has(taskId);
|
||||
}
|
||||
|
||||
// ========== Helpers ==========
|
||||
|
||||
/**
|
||||
* Get connected, idle turtles not currently executing a dispatched task.
|
||||
*/
|
||||
_getAvailableTurtles() {
|
||||
const available = [];
|
||||
for (const [id, turtle] of this._turtles) {
|
||||
if (
|
||||
turtle.connected &&
|
||||
turtle.stateName === 'idle' &&
|
||||
!this._activeTasks.has(id)
|
||||
) {
|
||||
available.push(turtle);
|
||||
}
|
||||
}
|
||||
return available;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick the best turtle for a task.
|
||||
* If task_data has coordinates, pick the closest turtle with a known position.
|
||||
* Otherwise, pick the turtle with the most fuel.
|
||||
*/
|
||||
_pickBestTurtle(candidates, taskData) {
|
||||
if (candidates.length === 0) return null;
|
||||
if (candidates.length === 1) return candidates[0];
|
||||
|
||||
// Try to extract a target position from task data
|
||||
const target = this._extractTarget(taskData);
|
||||
|
||||
if (target) {
|
||||
// Sort by Manhattan distance to target
|
||||
let bestTurtle = candidates[0];
|
||||
let bestDist = Infinity;
|
||||
|
||||
for (const t of candidates) {
|
||||
if (t.position) {
|
||||
const dist = Math.abs(t.position.x - target.x)
|
||||
+ Math.abs(t.position.y - target.y)
|
||||
+ Math.abs(t.position.z - target.z);
|
||||
if (dist < bestDist) {
|
||||
bestDist = dist;
|
||||
bestTurtle = t;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bestTurtle;
|
||||
}
|
||||
|
||||
// No target — pick turtle with highest fuel
|
||||
return candidates.reduce((best, t) => (t._fuel > best._fuel ? t : best), candidates[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to extract a target {x,y,z} from task_data.
|
||||
*/
|
||||
_extractTarget(data) {
|
||||
if (!data) return null;
|
||||
|
||||
// Direct target
|
||||
if (data.target && typeof data.target.x === 'number') return data.target;
|
||||
if (data.destination && typeof data.destination.x === 'number') return data.destination;
|
||||
|
||||
// Bounds — use center
|
||||
if (data.bounds) {
|
||||
const b = data.bounds;
|
||||
if (typeof b.minX === 'number' && typeof b.maxX === 'number') {
|
||||
return {
|
||||
x: Math.floor((b.minX + b.maxX) / 2),
|
||||
y: Math.floor((b.minY + b.maxY) / 2),
|
||||
z: Math.floor((b.minZ + b.maxZ) / 2),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Coordinate pair (from TaskPanel)
|
||||
if (typeof data.startX === 'number' && typeof data.startZ === 'number') {
|
||||
return { x: data.startX, y: data.startY || 64, z: data.startZ };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user