llmfleet: pool many agents' turns into one Batch API call and save 50 percent
Dynamic Latency-Aware Batching for Concurrent Agent Systems
Current Situation Analysis
The economics of large language model inference are shifting rapidly. As context windows expand and retrieval-augmented generation (RAG) pipelines become standard, input token consumption has become the primary cost driver for production deployments. Provider APIs now offer explicit pricing tiers to address this: Anthropic's Batch API, for example, applies a flat 50% discount to all input tokens submitted through its asynchronous queue.
Despite the clear financial incentive, most engineering teams leave this discount on the table. The root cause is a fundamental mismatch between the API's design assumptions and real-world agent architectures. The Batch API is engineered for offline, high-throughput workloads. It expects requests to be queued, processed asynchronously, and retrieved via polling after 90 to 120 seconds. When applied to a single interactive agent, this latency profile breaks the user experience. Developers are forced into a binary choice: pay full price for synchronous calls, or route everything through the batch queue and accept minute-scale delays.
This framing ignores the actual topology of modern AI systems. Production deployments rarely run a single agent in isolation. They orchestrate fleets of concurrent coroutines handling grading, summarization, extraction, parallel evaluations, and background enrichment. In a fleet architecture, the unit of batching should not be a single user turn. It should be the aggregated turn volume across all concurrent workers. The missing piece is a transparent routing layer that intercepts calls, evaluates their latency tolerance, and dynamically dispatches them to either a synchronous endpoint or a pooled batch queue. Without this abstraction, teams either overpay for latency they don't need or degrade UX by forcing interactive paths through offline queues.
WOW Moment: Key Findings
The breakthrough emerges when you decouple latency tolerance from routing logic. By attaching a latency_budget_ms parameter to each request, the system can automatically partition traffic without requiring developers to maintain separate code paths. The financial and operational impact is measurable across three dimensions: latency, cost, and throughput.
| Routing Strategy | p95 Latency | Input Token Cost | Throughput Capacity |
|---|---|---|---|
| Synchronous Direct | ~200 ms | 100% (full price) | Limited by rate limits & concurrency caps |
| Naive Batch Queue | ~90β120 s | 50% (discounted) | High, but blocks interactive paths |
| Fleet-Aware Dynamic | ~200 ms (sync) / ~30 s (batch) | ~50% blended | High, with zero UX degradation |
This finding matters because it transforms batch processing from an offline utility into a real-time cost optimization layer. The fleet-aware approach preserves sub-second responsiveness for user-facing turns while automatically pooling background workloads into discounted batches. It eliminates the need for manual queue management, prevents starvation during low-traffic periods, and ensures that the 50% input discount is captured at scale without architectural compromise.
Core Solution
The architecture centers on a transparent dispatcher that sits between your agent coroutines and the LLM provider SDK. It operates on three principles: explicit latency budgeting, dual-path routing, and windowed batch flushing.
Architecture Decisions & Rationale
Latency Budget as a First-Class Parameter: Instead of hardcoding sync vs. batch paths, each request carries a
max_wait_msthreshold. The dispatcher compares this against a configured sync ceiling. If the budget is tight, the call bypasses the queue. If loose, it enters the batch pool. This removes routing logic from business code.Process-Local Queue with Future Resolution: Batches are assembled in-memory within the running process. Each queued request attaches an
asyncio.Future. When the batch completes, the dispatcher resolves each future with its corresponding response. This avoids external dependencies (Redis, SQS) while maintaining coroutine isolation.Dual-Threshold Flush Mechanism: The background flusher coroutine evaluates two conditions on every tick:
- Queue size reaches
min_batch_size - Oldest queued item exceeds
max_window_msWhichever condition triggers first initiates a flush. This prevents starvation during traffic valleys while maximizing batch utilization during peaks.
- Queue size reaches
Implementation
The following example demonstrates a complete routing layer with explicit typing, future-based resolution, and a windowed flusher.
import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from anthropic import AsyncAnthropic
@dataclass
class RoutingConfig:
sync_latency_cap_ms: int = 5000
batch_window_ms: int = 30000
min_batch_size: int = 10
max_batch_size: int = 100
@dataclass
class RequestTicket:
payload: Dict[str, Any]
future: asyncio.Future
queued_at: float = field(default_factory=time.monotonic)
class AgentFleetRouter:
def __init__(self, client: AsyncAnthropic, config: RoutingConfig):
self.client = client
self.config = config
self._queue: List[RequestTicket] = []
self._flush_task: Optional[asyncio.Task] = None
self._stats = {"sync": 0, "batched": 0, "batches_flushed": 0}
async def __aenter__(self):
self._flush_task = asyncio.create_task(self._background_flusher())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._flush_task:
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
async def submit(self, latency_budget_ms: int, **kwargs: Any) -> Any:
if latency_budget_ms <= self.config.sync_latency_cap_ms:
self._stats["sync"] += 1
return await self.client.messages.create(**kwargs)
ticket = RequestTicket(
payload=kwargs,
future=asyncio.get_event_loop().create_future()
)
self._queue.append(ticket)
self._stats["batched"] += 1
return await ticket.future
async def _background_flusher(self):
poll_interval = 1.0
while True:
await asyncio.sleep(poll_interval)
now = time.monotonic()
should_flush = (
len(self._queue) >= self.config.min_batch_size or
(self._queue and (now - self._queue[0].queued_at) >= self.config.batch_window_ms / 1000)
)
if should_flush and self._queue:
await self._execute_batch_flush()
async def _execute_batch_flush(self):
batch = self._queue[:self.config.max_batch_size]
self._queue = self._queue[self.config.max_batch_size:]
if not batch:
return
self._stats["batches_flushed"] += 1
requests = [
{"custom_id": str(i), "params": t.payload}
for i, t in enumerate(batch)
]
try:
batch_job = await self.client.batches.create(requests=requests)
completed = await self._poll_batch_until_done(batch_job.id)
self._resolve_futures(batch, completed)
except Exception as e:
for ticket in batch:
if not ticket.future.done():
ticket.future.set_exception(e)
async def _poll_batch_until_done(self, batch_id: str) -> List[Dict]:
while True:
status = await self.client.batches.retrieve(batch_id)
if status.status in ("completed", "expired", "cancelling", "failed"):
if status.status == "completed":
results = await self.client.batches.results(batch_id)
return list(results)
raise RuntimeError(f"Batch {batch_id} terminated with status: {status.status}")
await asyncio.sleep(5)
def _resolve_futures(self, tickets: List[RequestTicket], results: List[Dict]):
result_map = {r["custom_id"]: r for r in results}
for ticket in tickets:
rid = str(tickets.index(ticket))
if rid in result_map:
ticket.future.set_result(result_map[rid]["result"]["message"])
elif not ticket.future.done():
ticket.future.set_exception(RuntimeError("Missing result for custom_id"))
Usage Pattern
async def run_fleet_workload():
client = AsyncAnthropic()
policy = RoutingConfig(
sync_latency_cap_ms=5000,
batch_window_ms=30000,
min_batch_size=10,
max_batch_size=100
)
async with AgentFleetRouter(client, policy) as router:
# Interactive path: tight budget forces sync routing
chat_response = await router.submit(
latency_budget_ms=2000,
model="claude-sonnet-4-20250514",
max_tokens=200,
messages=[{"role": "user", "content": "Initialize session"}]
)
# Background path: loose budget enters batch pool
essays = [f"Essay {i} content..." for i in range(50)]
grading_tasks = [
router.submit(
latency_budget_ms=600000,
model="claude-sonnet-4-20250514",
max_tokens=200,
messages=[{"role": "user", "content": f"Grade: {essay}"}]
)
for essay in essays
]
grades = await asyncio.gather(*grading_tasks)
print(f"Sync: {router._stats['sync']} | Batched: {router._stats['batched']}")
asyncio.run(run_fleet_workload())
The caller never interacts with queue mechanics. It simply declares a latency tolerance and awaits a response. The dispatcher handles partitioning, batching, polling, and future resolution transparently.
Pitfall Guide
1. Queue Starvation During Traffic Valleys
Explanation: If you only flush based on min_batch_size, low-traffic periods will leave requests queued indefinitely. The batch never reaches the threshold, and latency budgets are violated.
Fix: Always implement a dual-threshold flush. The max_window_ms timeout guarantees that every queued request is processed within its latency budget, regardless of traffic volume.
2. Tool-Call Critical Path Blocking
Explanation: Agent loops that require immediate tool execution (e.g., database queries, API lookups) cannot tolerate batch delays. Routing these turns through the batch queue breaks the agent's control flow.
Fix: Detect tool-use turns in your orchestration layer and force latency_budget_ms below the sync threshold. Alternatively, expose a force_sync=True flag that bypasses the queue entirely.
3. Cross-Process Fragmentation
Explanation: The in-memory queue is process-local. If you scale horizontally across multiple containers or workers, each process maintains its own isolated batch pool. This fragments queue depth and reduces batch utilization. Fix: Accept process-local batching for single-instance deployments. For multi-process architectures, introduce an external message broker (Redis Streams, SQS, RabbitMQ) to aggregate requests before routing to the batch API.
4. Silent Batch Failures
Explanation: The Batch API returns per-request results, but network timeouts, rate limits, or malformed payloads can cause entire batches to fail or expire. Without explicit error handling, awaiting futures will hang or raise unhandled exceptions. Fix: Implement per-request error propagation. Catch batch-level failures, reject all associated futures with descriptive exceptions, and expose retry hooks. Never swallow batch errors silently.
5. Context Window Overflow in Batches
Explanation: Batching aggregates payloads. If you queue 100 requests with 50k-token contexts, the combined payload may exceed provider limits or trigger unexpected truncation. Fix: Enforce per-request token budgets before queuing. Validate context length against the model's maximum, and implement a pre-queue filter that rejects or splits oversized payloads.
6. Ignoring Output Token Costs
Explanation: The 50% discount applies exclusively to input tokens. Output tokens are billed at standard rates. Teams often assume the discount covers the entire request, leading to inaccurate cost projections. Fix: Track input vs. output token consumption separately. Use the batch discount to offset large system prompts and RAG context, but model output costs independently in your budgeting layer.
7. Over-Queuing Interactive Paths
Explanation: Developers sometimes set overly generous latency_budget_ms values for user-facing turns, accidentally routing them through the batch queue. This introduces unpredictable latency spikes.
Fix: Define strict sync ceilings based on UX requirements. Interactive chat should never exceed 2β5 seconds. Use configuration validation to reject budgets that exceed the sync threshold for known interactive endpoints.
Production Bundle
Action Checklist
- Define explicit latency budgets per agent path (interactive vs. background)
- Implement dual-threshold flush logic (min size + max window) to prevent starvation
- Validate context length before queuing to avoid batch payload overflow
- Separate input and output token accounting to track actual discount impact
- Add per-request error propagation and retry hooks for batch failures
- Monitor queue depth, flush frequency, and sync/batch split ratios in observability dashboards
- Force synchronous routing for tool-call turns and critical control flow steps
- Test under traffic valleys to verify window timeout prevents indefinite queuing
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|---|---|---|
| Single interactive agent | Direct sync API | Batch polling adds 90β120s latency with no volume benefit | 100% input cost |
| Multi-agent background workload (grading, extraction) | Fleet-aware batch routing | High concurrency enables efficient pooling and 50% input discount | ~50% input cost |
| Mixed interactive + background in same process | Dynamic latency-budget router | Preserves UX for chat while batching offline turns transparently | Blended ~50% input cost |
| Cross-process / distributed agents | External queue + batch router | In-memory queues fragment across workers; broker aggregates volume | ~50% input cost + broker overhead |
| Tool-dependent agent loops | Force sync for tool turns | Batch delays break agent control flow and state consistency | 100% input cost for tool turns |
Configuration Template
# fleet_router_config.yaml
routing:
sync_latency_cap_ms: 5000
batch_window_ms: 30000
min_batch_size: 10
max_batch_size: 100
poll_interval_s: 1.0
token_budgets:
max_input_tokens_per_request: 40000
enforce_pre_queue_validation: true
observability:
metrics_prefix: "llm.fleet"
track_sync_batch_split: true
alert_on_batch_failure_rate: 0.05
Quick Start Guide
- Install dependencies: Ensure
anthropicSDK is installed and your API key is configured in environment variables. - Initialize the router: Instantiate
AgentFleetRouterwith yourAsyncAnthropicclient and aRoutingConfigmatching your latency requirements. - Wrap agent calls: Replace direct
client.messages.create()calls withrouter.submit(latency_budget_ms=..., **kwargs). Set tight budgets for interactive paths and loose budgets for background work. - Monitor and tune: Track
syncvsbatchedsplit ratios. Adjustmin_batch_sizeandbatch_window_msbased on traffic patterns. Verify that background turns consistently hit the 30-second window without starving during low traffic.
Mid-Year Sale β Unlock Full Article
Base plan from just $4.99/mo or $49/yr
Sign in to read the full article and unlock all tutorials.
Sign In / Register β Start Free Trial7-day free trial Β· Cancel anytime Β· 30-day money-back
