388 lines
12 KiB
JavaScript
388 lines
12 KiB
JavaScript
/**
|
|
* TaskDispatcher — Automatic task queue dispatcher for RemoteTurtle
|
|
*
|
|
* Periodically polls the task_queue for pending tasks, matches them to
|
|
* available (idle + connected) turtles, maps task_type to state machine
|
|
* states, and drives the turtle state machine. Handles turtle disconnection
|
|
* mid-task (re-queues) and completion callbacks.
|
|
*
|
|
* Task type → State machine mapping:
|
|
* mine_area → mining (requires bounds in task_data)
|
|
* explore → exploring (requires target/area in task_data)
|
|
* gather → extracting (requires blockName in task_data)
|
|
* build → building (requires plan in task_data)
|
|
* transport → moving (requires target position)
|
|
* clear_area → mining (same as mine_area)
|
|
* scan → scanning (requires area/range)
|
|
* farm → farming (requires area)
|
|
* autocraft → autocrafting (requires recipe)
|
|
*/
|
|
|
|
// Map task_type values from the TaskPanel UI to turtle state machine state names + data mappers
|
|
const TASK_TYPE_MAP = {
|
|
mine_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) },
|
|
explore: { state: 'exploring', mapData: d => ({ target: d.target || d, ...d }) },
|
|
gather: { state: 'extracting', mapData: d => ({ blockName: d.blockName, count: d.count, ...d }) },
|
|
build: { state: 'building', mapData: d => ({ plan: d.plan, origin: d.origin, ...d }) },
|
|
transport: { state: 'moving', mapData: d => ({ target: d.target || d.destination || d, ...d }) },
|
|
clear_area: { state: 'mining', mapData: d => ({ bounds: d.bounds || d, ...d }) },
|
|
scan: { state: 'scanning', mapData: d => ({ area: d.area, ...d }) },
|
|
farm: { state: 'farming', mapData: d => ({ area: d.area, ...d }) },
|
|
autocraft: { state: 'autocrafting', mapData: d => ({ recipe: d.recipe, count: d.count, ...d }) },
|
|
};
|
|
|
|
export class TaskDispatcher {
|
|
/**
|
|
* @param {Object} opts
|
|
* @param {Map<number, import('./Turtle.js').Turtle>} opts.turtles - Live turtle map
|
|
* @param {Object} opts.db - Database module (server/database.js)
|
|
* @param {Function} opts.broadcastToClients - WebSocket broadcaster
|
|
* @param {number} [opts.pollInterval=5000] - ms between dispatch cycles
|
|
*/
|
|
constructor({ turtles, db, broadcastToClients, pollInterval = 5000 }) {
|
|
this._turtles = turtles;
|
|
this._db = db;
|
|
this._broadcast = broadcastToClients;
|
|
this._pollInterval = pollInterval;
|
|
this._timer = null;
|
|
this._enabled = true;
|
|
|
|
// Track which tasks are actively being executed by which turtles
|
|
// turtleId -> { taskId, taskType }
|
|
this._activeTasks = new Map();
|
|
|
|
// Reverse lookup: taskId -> turtleId
|
|
this._taskToTurtle = new Map();
|
|
}
|
|
|
|
/** Start the dispatch loop */
|
|
start() {
|
|
if (this._timer) return;
|
|
console.log(`🚀 TaskDispatcher started (poll every ${this._pollInterval}ms)`);
|
|
this._timer = setInterval(() => this._tick(), this._pollInterval);
|
|
// Run immediately on start
|
|
this._tick();
|
|
}
|
|
|
|
/** Stop the dispatch loop */
|
|
stop() {
|
|
if (this._timer) {
|
|
clearInterval(this._timer);
|
|
this._timer = null;
|
|
console.log('🛑 TaskDispatcher stopped');
|
|
}
|
|
}
|
|
|
|
/** Enable/disable automatic dispatching (tasks still tracked when disabled) */
|
|
set enabled(val) {
|
|
this._enabled = !!val;
|
|
console.log(`TaskDispatcher: auto-dispatch ${this._enabled ? 'enabled' : 'disabled'}`);
|
|
}
|
|
|
|
get enabled() {
|
|
return this._enabled;
|
|
}
|
|
|
|
/** Get dispatcher status for the API */
|
|
status() {
|
|
return {
|
|
enabled: this._enabled,
|
|
activeTasks: Array.from(this._activeTasks.entries()).map(([turtleId, info]) => ({
|
|
turtleId,
|
|
...info,
|
|
})),
|
|
pollInterval: this._pollInterval,
|
|
};
|
|
}
|
|
|
|
// ========== Internal ==========
|
|
|
|
/**
|
|
* One dispatch cycle:
|
|
* 1. Reconcile active tasks (detect turtle disconnects, state completions)
|
|
* 2. If enabled, find pending tasks and assign to idle turtles
|
|
*/
|
|
_tick() {
|
|
try {
|
|
this._reconcile();
|
|
if (this._enabled) {
|
|
this._dispatch();
|
|
}
|
|
} catch (err) {
|
|
console.error('[TaskDispatcher] tick error:', err.message);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reconcile: check if turtles executing tasks have finished or disconnected.
|
|
*/
|
|
_reconcile() {
|
|
for (const [turtleId, info] of this._activeTasks) {
|
|
const turtle = this._turtles.get(turtleId);
|
|
|
|
// Turtle removed from server
|
|
if (!turtle) {
|
|
this._handleTaskFailure(info.taskId, turtleId, 'Turtle no longer exists');
|
|
continue;
|
|
}
|
|
|
|
// Turtle disconnected mid-task
|
|
if (!turtle.connected) {
|
|
this._handleTaskFailure(info.taskId, turtleId, 'Turtle disconnected');
|
|
continue;
|
|
}
|
|
|
|
// Turtle transitioned to idle → task completed
|
|
if (turtle.stateName === 'idle') {
|
|
this._handleTaskCompletion(info.taskId, turtleId);
|
|
continue;
|
|
}
|
|
|
|
// Turtle errored out
|
|
if (turtle._error) {
|
|
this._handleTaskFailure(info.taskId, turtleId, turtle._error);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Dispatch: find pending (unassigned or assigned-but-not-started) tasks,
|
|
* match to available turtles, and start them.
|
|
*/
|
|
_dispatch() {
|
|
// Get all pending tasks (sorted by priority DESC, created_at ASC via DB)
|
|
let pendingTasks;
|
|
try {
|
|
pendingTasks = this._db.getAllTasks('pending');
|
|
} catch (e) {
|
|
return;
|
|
}
|
|
|
|
if (!pendingTasks || pendingTasks.length === 0) return;
|
|
|
|
// Find idle, connected turtles not already executing a task
|
|
const availableTurtles = this._getAvailableTurtles();
|
|
if (availableTurtles.length === 0) return;
|
|
|
|
for (const task of pendingTasks) {
|
|
if (availableTurtles.length === 0) break;
|
|
|
|
const mapping = TASK_TYPE_MAP[task.task_type];
|
|
if (!mapping) {
|
|
console.warn(`[TaskDispatcher] Unknown task type: ${task.task_type} (task #${task.id})`);
|
|
continue;
|
|
}
|
|
|
|
// If task has a specific turtle assignment, respect it
|
|
let turtle = null;
|
|
if (task.assigned_turtle_id) {
|
|
turtle = this._turtles.get(task.assigned_turtle_id);
|
|
if (!turtle || !turtle.connected || turtle.stateName !== 'idle') {
|
|
// Assigned turtle not available — skip this task for now
|
|
continue;
|
|
}
|
|
// Remove from available pool
|
|
const idx = availableTurtles.indexOf(turtle);
|
|
if (idx !== -1) availableTurtles.splice(idx, 1);
|
|
} else {
|
|
// Pick the best available turtle (closest to target if we have coords, else first)
|
|
turtle = this._pickBestTurtle(availableTurtles, task.task_data);
|
|
const idx = availableTurtles.indexOf(turtle);
|
|
if (idx !== -1) availableTurtles.splice(idx, 1);
|
|
}
|
|
|
|
if (!turtle) continue;
|
|
|
|
// Dispatch!
|
|
this._startTask(task, turtle, mapping);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start a task on a turtle.
|
|
*/
|
|
_startTask(task, turtle, mapping) {
|
|
const taskId = task.id;
|
|
const turtleId = turtle.id;
|
|
|
|
console.log(`[TaskDispatcher] Assigning task #${taskId} (${task.task_type}) → Turtle #${turtleId}`);
|
|
|
|
// Map task_data to state data
|
|
const stateData = mapping.mapData(task.task_data || {});
|
|
|
|
// Update DB: assign + set in_progress
|
|
try {
|
|
this._db.assignTask(taskId, turtleId);
|
|
this._db.updateTaskStatus(taskId, 'in_progress');
|
|
} catch (e) {
|
|
console.error(`[TaskDispatcher] DB error assigning task #${taskId}:`, e.message);
|
|
return;
|
|
}
|
|
|
|
// Track
|
|
this._activeTasks.set(turtleId, { taskId, taskType: task.task_type });
|
|
this._taskToTurtle.set(taskId, turtleId);
|
|
|
|
// Broadcast updates
|
|
this._broadcast({ type: 'task_assigned', taskId, turtleId });
|
|
this._broadcast({ type: 'task_updated', taskId, status: 'in_progress' });
|
|
|
|
// Set the turtle's state machine
|
|
turtle.setState(mapping.state, stateData);
|
|
}
|
|
|
|
/**
|
|
* Handle task completion (turtle went back to idle after executing).
|
|
*/
|
|
_handleTaskCompletion(taskId, turtleId) {
|
|
console.log(`[TaskDispatcher] Task #${taskId} completed by Turtle #${turtleId}`);
|
|
|
|
try {
|
|
this._db.completeTask(taskId);
|
|
} catch (e) {
|
|
console.error(`[TaskDispatcher] DB error completing task #${taskId}:`, e.message);
|
|
}
|
|
|
|
this._activeTasks.delete(turtleId);
|
|
this._taskToTurtle.delete(taskId);
|
|
|
|
this._broadcast({ type: 'task_completed', taskId });
|
|
}
|
|
|
|
/**
|
|
* Handle task failure (turtle disconnected, errored, etc.).
|
|
* Re-queues the task as pending so another turtle can pick it up.
|
|
*/
|
|
_handleTaskFailure(taskId, turtleId, reason) {
|
|
console.warn(`[TaskDispatcher] Task #${taskId} failed on Turtle #${turtleId}: ${reason}`);
|
|
|
|
try {
|
|
// Re-queue: set back to pending, clear assignment
|
|
this._db.updateTaskStatus(taskId, 'pending', reason);
|
|
this._db.assignTask(taskId, null);
|
|
} catch (e) {
|
|
console.error(`[TaskDispatcher] DB error re-queuing task #${taskId}:`, e.message);
|
|
}
|
|
|
|
this._activeTasks.delete(turtleId);
|
|
this._taskToTurtle.delete(taskId);
|
|
|
|
this._broadcast({ type: 'task_updated', taskId, status: 'pending' });
|
|
}
|
|
|
|
/**
|
|
* Manually cancel a running task (called via API).
|
|
*/
|
|
cancelTask(taskId) {
|
|
const turtleId = this._taskToTurtle.get(taskId);
|
|
if (turtleId !== undefined) {
|
|
const turtle = this._turtles.get(turtleId);
|
|
if (turtle) {
|
|
turtle.setState('idle');
|
|
}
|
|
this._activeTasks.delete(turtleId);
|
|
this._taskToTurtle.delete(taskId);
|
|
}
|
|
|
|
try {
|
|
this._db.updateTaskStatus(taskId, 'cancelled');
|
|
} catch (e) {
|
|
console.error(`[TaskDispatcher] DB error cancelling task #${taskId}:`, e.message);
|
|
}
|
|
|
|
this._broadcast({ type: 'task_updated', taskId, status: 'cancelled' });
|
|
}
|
|
|
|
/**
|
|
* Check if a specific task is currently being executed.
|
|
*/
|
|
isTaskActive(taskId) {
|
|
return this._taskToTurtle.has(taskId);
|
|
}
|
|
|
|
// ========== Helpers ==========
|
|
|
|
/**
|
|
* Get connected, idle turtles not currently executing a dispatched task.
|
|
*/
|
|
_getAvailableTurtles() {
|
|
const available = [];
|
|
for (const [id, turtle] of this._turtles) {
|
|
if (
|
|
turtle.connected &&
|
|
turtle.stateName === 'idle' &&
|
|
!this._activeTasks.has(id)
|
|
) {
|
|
available.push(turtle);
|
|
}
|
|
}
|
|
return available;
|
|
}
|
|
|
|
/**
|
|
* Pick the best turtle for a task.
|
|
* If task_data has coordinates, pick the closest turtle with a known position.
|
|
* Otherwise, pick the turtle with the most fuel.
|
|
*/
|
|
_pickBestTurtle(candidates, taskData) {
|
|
if (candidates.length === 0) return null;
|
|
if (candidates.length === 1) return candidates[0];
|
|
|
|
// Try to extract a target position from task data
|
|
const target = this._extractTarget(taskData);
|
|
|
|
if (target) {
|
|
// Sort by Manhattan distance to target
|
|
let bestTurtle = candidates[0];
|
|
let bestDist = Infinity;
|
|
|
|
for (const t of candidates) {
|
|
if (t.position) {
|
|
const dist = Math.abs(t.position.x - target.x)
|
|
+ Math.abs(t.position.y - target.y)
|
|
+ Math.abs(t.position.z - target.z);
|
|
if (dist < bestDist) {
|
|
bestDist = dist;
|
|
bestTurtle = t;
|
|
}
|
|
}
|
|
}
|
|
return bestTurtle;
|
|
}
|
|
|
|
// No target — pick turtle with highest fuel
|
|
return candidates.reduce((best, t) => (t._fuel > best._fuel ? t : best), candidates[0]);
|
|
}
|
|
|
|
/**
|
|
* Try to extract a target {x,y,z} from task_data.
|
|
*/
|
|
_extractTarget(data) {
|
|
if (!data) return null;
|
|
|
|
// Direct target
|
|
if (data.target && typeof data.target.x === 'number') return data.target;
|
|
if (data.destination && typeof data.destination.x === 'number') return data.destination;
|
|
|
|
// Bounds — use center
|
|
if (data.bounds) {
|
|
const b = data.bounds;
|
|
if (typeof b.minX === 'number' && typeof b.maxX === 'number') {
|
|
return {
|
|
x: Math.floor((b.minX + b.maxX) / 2),
|
|
y: Math.floor((b.minY + b.maxY) / 2),
|
|
z: Math.floor((b.minZ + b.maxZ) / 2),
|
|
};
|
|
}
|
|
}
|
|
|
|
// Coordinate pair (from TaskPanel)
|
|
if (typeof data.startX === 'number' && typeof data.startZ === 'number') {
|
|
return { x: data.startX, y: data.startY || 64, z: data.startZ };
|
|
}
|
|
|
|
return null;
|
|
}
|
|
}
|