eue is ephemeral, tied to the lifecycle of the agent run. Durable queues introduce latency and complexity that are unjustified for transient sub-tasks.
TypeScript Implementation
The following implementation demonstrates a production-grade priority scheduler in TypeScript. It includes a binary heap, tie-breaking, and a lazy removal mechanism to support task cancellation.
// Priority levels: Lower numeric value indicates higher urgency.
export enum Priority {
CRITICAL = 0,
HIGH = 1,
MEDIUM = 2,
LOW = 3,
BACKGROUND = 4,
}
export interface AgentTask<TPayload = unknown> {
id: string;
payload: TPayload;
priority: Priority;
enqueuedAt: number; // Monotonic timestamp for tie-breaking
}
export class PriorityScheduler<TPayload = unknown> {
private heap: Array<{ task: AgentTask<TPayload>; sequence: number }> = [];
private sequenceCounter: number = 0;
private removedIds: Set<string> = new Set();
/**
* Adds a task to the queue.
* Throws if the task ID already exists.
*/
push(task: AgentTask<TPayload>): void {
if (this.removedIds.has(task.id)) {
this.removedIds.delete(task.id);
}
if (this.heap.some((entry) => entry.task.id === task.id)) {
throw new Error(`Duplicate task ID: ${task.id}`);
}
const entry = {
task,
sequence: this.sequenceCounter++,
};
this.heap.push(entry);
this.bubbleUp(this.heap.length - 1);
}
/**
* Removes and returns the highest-priority task.
* Returns undefined if the queue is empty.
*/
pop(): AgentTask<TPayload> | undefined {
if (this.heap.length === 0) return undefined;
// Lazy removal: skip tasks marked as removed
while (this.heap.length > 0) {
const root = this.heap[0];
if (this.removedIds.has(root.task.id)) {
this.removedIds.delete(root.task.id);
this.swap(0, this.heap.length - 1);
this.heap.pop();
this.sinkDown(0);
continue;
}
break;
}
if (this.heap.length === 0) return undefined;
const root = this.heap[0];
const last = this.heap.pop()!;
if (this.heap.length > 0) {
this.heap[0] = last;
this.sinkDown(0);
}
return root.task;
}
/**
* Marks a task for removal. The task is skipped during pop.
*/
cancel(taskId: string): void {
this.removedIds.add(taskId);
}
/**
* Updates the priority of an existing task.
* Implemented as cancel + push to maintain heap invariant.
*/
reprioritize(taskId: string, newPriority: Priority): void {
const index = this.heap.findIndex((e) => e.task.id === taskId);
if (index === -1) {
throw new Error(`Task not found: ${taskId}`);
}
const existing = this.heap[index].task;
this.cancel(taskId);
this.push({ ...existing, priority: newPriority });
}
peek(): AgentTask<TPayload> | undefined {
for (const entry of this.heap) {
if (!this.removedIds.has(entry.task.id)) {
return entry.task;
}
}
return undefined;
}
get size(): number {
return this.heap.length - this.removedIds.size;
}
// Heap operations
private bubbleUp(index: number): void {
while (index > 0) {
const parentIndex = Math.floor((index - 1) / 2);
if (this.compare(index, parentIndex) < 0) {
this.swap(index, parentIndex);
index = parentIndex;
} else {
break;
}
}
}
private sinkDown(index: number): void {
const length = this.heap.length;
while (true) {
const left = 2 * index + 1;
const right = 2 * index + 2;
let smallest = index;
if (left < length && this.compare(left, smallest) < 0) {
smallest = left;
}
if (right < length && this.compare(right, smallest) < 0) {
smallest = right;
}
if (smallest !== index) {
this.swap(index, smallest);
index = smallest;
} else {
break;
}
}
}
private compare(i: number, j: number): number {
const a = this.heap[i];
const b = this.heap[j];
if (a.task.priority !== b.task.priority) {
return a.task.priority - b.task.priority;
}
return a.sequence - b.sequence;
}
private swap(i: number, j: number): void {
[this.heap[i], this.heap[j]] = [this.heap[j], this.heap[i]];
}
}
Usage Pattern
The scheduler integrates into the agent loop by classifying tasks during decomposition and dynamically enqueuing follow-up work.
import { PriorityScheduler, Priority, AgentTask } from './scheduler';
const scheduler = new PriorityScheduler();
function runAgentLoop(userRequest: string): void {
// 1. Decompose request and classify priorities
const subtasks = decomposeRequest(userRequest);
for (const sub of subtasks) {
scheduler.push({
id: generateId(),
payload: sub.data,
priority: classifyUrgency(sub.type),
enqueuedAt: performance.now(),
});
}
// 2. Process loop
while (scheduler.size > 0) {
const task = scheduler.pop();
if (!task) break;
const result = executeTask(task);
// 3. Dynamic enqueuing
// High-priority results may spawn immediate follow-ups
if (result.requiresImmediateAction) {
scheduler.push({
id: generateId(),
payload: result.followupData,
priority: Priority.HIGH,
enqueuedAt: performance.now(),
});
}
}
}
Pitfall Guide
-
Priority Starvation
- Explanation: Low-priority tasks may never execute if the queue is continuously fed with high-priority work.
- Fix: Implement an aging mechanism. Increase the priority of tasks that have been in the queue longer than a threshold. For example, boost
LOW to MEDIUM after 60 seconds.
-
Priority Inflation
- Explanation: Developers tend to mark everything as
CRITICAL or HIGH to ensure execution, rendering the priority system ineffective.
- Fix: Enforce strict governance.
CRITICAL should be reserved for user-blocking operations. Use code reviews or automated checks to validate priority assignments.
-
Ignoring Task Dependencies
- Explanation: A high-priority task may depend on the output of a low-priority task. Executing the high-priority task first causes failure.
- Fix: Integrate a dependency graph. The scheduler should only expose tasks whose dependencies are satisfied. Use a library like
agent-tool-graph to resolve prerequisites before pushing to the queue.
-
Lazy Removal Memory Leaks
- Explanation: Marking tasks as removed without cleaning the heap causes memory growth and performance degradation over time.
- Fix: Implement periodic compaction. When the ratio of removed tasks to total heap size exceeds a threshold (e.g., 50%), rebuild the heap excluding removed entries.
-
Non-Deterministic Tie-Breaking
- Explanation: Without a secondary sort key, tasks with equal priority may execute in arbitrary order, leading to flaky behavior.
- Fix: Always include a monotonic sequence counter or timestamp as the secondary comparison key.
-
Overhead for Trivial Workloads
- Explanation: For agents with fewer than five tasks, the overhead of heap operations may exceed the benefit.
- Fix: Add a threshold check. If the task count is below a minimum (e.g., 3), use a simple array sort or linear scan.
-
In-Memory Volatility
- Explanation: Process crashes result in total loss of pending tasks.
- Fix: For long-running agents, implement checkpointing. Serialize the queue state to disk or a durable store at regular intervals. Use
agent-resume patterns to recover state.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Linear Pipeline | Simple Array / Loop | Tasks have uniform urgency; overhead of queue is unjustified. | Low |
| Mixed Urgency | Priority Queue | Reduces time-to-value; handles heterogeneous sub-tasks efficiently. | Medium |
| Complex Dependencies | DAG Scheduler | Ensures correctness when tasks depend on prior outputs. | High |
| Distributed Execution | External Message Queue | Requires persistence, scaling, and multi-node coordination. | High |
| Rate-Limited APIs | Priority Queue + Delay | Manages urgency while respecting API rate limits via scheduled delays. | Medium |
Configuration Template
interface SchedulerConfig {
// Enable aging to boost priority of long-waiting tasks
aging: {
enabled: boolean;
thresholdMs: number; // Time before priority boost
boostLevel: number; // Priority increment
};
// Compaction settings for lazy removal
compaction: {
enabled: boolean;
removalRatioThreshold: number; // e.g., 0.5
};
// Observability
metrics: {
enabled: boolean;
reporter: (metrics: SchedulerMetrics) => void;
};
}
const defaultConfig: SchedulerConfig = {
aging: {
enabled: true,
thresholdMs: 60_000,
boostLevel: 1,
},
compaction: {
enabled: true,
removalRatioThreshold: 0.5,
},
metrics: {
enabled: true,
reporter: console.log,
},
};
Quick Start Guide
- Define Priorities: Create an enum or constant map for your priority levels. Ensure lower values represent higher urgency.
- Instantiate Scheduler: Create a
PriorityScheduler instance with your configuration.
- Classify Tasks: During task decomposition, assign a priority based on the task type and user intent.
- Execute Loop: Implement a
while loop that calls pop() and executes the returned task. Handle dynamic enqueuing of follow-up tasks within the loop.
- Monitor: Track queue metrics to detect starvation or bottlenecks. Adjust aging thresholds based on observed latency patterns.