async: List[Callable] = []
def subscribe(self, event_name: str, handler: Callable) -> None:
if asyncio.iscoroutinefunction(handler):
self._async_handlers.setdefault(event_name, []).append(handler)
else:
self._sync_handlers.setdefault(event_name, []).append(handler)
def subscribe_once(self, event_name: str, handler: Callable) -> None:
self._once_handlers.setdefault(event_name, []).append(handler)
def subscribe_all(self, handler: Callable) -> None:
if asyncio.iscoroutinefunction(handler):
self._wildcard_async.append(handler)
else:
self._wildcard_sync.append(handler)
### Step 2: Implement Error-Isolated Emission
The dispatcher must never allow a handler exception to propagate to the caller. Each handler executes in a protected scope.
```python
def emit(self, event_name: str, payload: Any) -> None:
# Route to specific handlers
for handler in self._sync_handlers.get(event_name, []):
self._safe_execute(handler, event_name, payload)
# Route to wildcard listeners
for handler in self._wildcard_sync:
self._safe_execute(handler, event_name, payload)
# Handle one-time subscriptions
if event_name in self._once_handlers:
for handler in self._once_handlers[event_name]:
self._safe_execute(handler, event_name, payload)
del self._once_handlers[event_name]
def _safe_execute(self, handler: Callable, event_name: str, payload: Any) -> None:
try:
handler(payload)
except Exception as exc:
logger.exception(
"Observer %s failed on event %s: %s",
handler.__name__, event_name, exc
)
Step 3: Add Async Dispatch Support
Async handlers coexist with sync handlers. The async emit method awaits coroutines while running sync handlers inline.
async def emit_async(self, event_name: str, payload: Any) -> None:
# Run sync handlers inline
self.emit(event_name, payload)
# Await async handlers
async_tasks = []
for handler in self._async_handlers.get(event_name, []):
async_tasks.append(self._safe_execute_async(handler, event_name, payload))
for handler in self._wildcard_async:
async_tasks.append(self._safe_execute_async(handler, event_name, payload))
if async_tasks:
await asyncio.gather(*async_tasks, return_exceptions=True)
async def _safe_execute_async(self, handler: Callable, event_name: str, payload: Any) -> None:
try:
await handler(payload)
except Exception as exc:
logger.exception(
"Async observer %s failed on event %s: %s",
handler.__name__, event_name, exc
)
Step 4: Wire Into the Agent Loop
The agent loop only knows about emission. It never imports trackers, loggers, or alerting modules.
router = TelemetryRouter()
@router.subscribe("model_response_received")
def track_latency(payload: dict) -> None:
metrics_client.gauge("agent.model_latency", payload.get("duration_ms", 0))
@router.subscribe_once("budget_threshold_hit")
def trigger_escalation(payload: dict) -> None:
alerting_service.send_critical(payload.get("current_cost", 0))
@router.subscribe_all("debug_audit")
def capture_everything(event_name: str, payload: dict) -> None:
audit_logger.info(f"[AUDIT] {event_name} -> {payload}")
async def run_agent_cycle(user_input: str) -> str:
response = await llm_client.generate(user_input)
await router.emit_async("model_response_received", {
"response": response,
"duration_ms": 142,
"tokens_used": 380
})
return response
Architecture Decisions & Rationale
- In-Memory Design: The bus operates within a single process boundary. This eliminates network latency, serialization overhead, and broker dependencies. Agent loops require sub-millisecond telemetry routing; external message queues introduce unacceptable jitter.
- Fire-and-Continue Semantics: Handlers are never awaited for acknowledgment. This prevents backpressure from slow observers. If your workflow requires synchronous validation before proceeding, use explicit callbacks or result promises instead.
- No Schema Enforcement: The dispatcher accepts arbitrary payloads. Validation is deferred to the call site or individual handlers. This prioritizes runtime performance and flexibility over strict contract enforcement.
- Exception Containment: Each handler is wrapped in a try/except block. This mirrors browser DOM event systems and Python's standard logging module. Observers are isolated guests; the dispatcher is the host.
Pitfall Guide
1. Treating the Bus as a Distributed Message Queue
Explanation: Developers sometimes assume the event bus can route messages across microservices or persist events for later consumption. The dispatcher is strictly in-memory and process-bound.
Fix: Use Redis Streams, Kafka, or SQS for cross-process routing. Reserve the in-memory bus for single-process agent telemetry and control flow decoupling.
2. Expecting Synchronous Acknowledgment
Explanation: The bus operates on fire-and-continue semantics. If a handler needs to approve a budget threshold or validate a tool output before the agent proceeds, the bus will not block execution.
Fix: Implement explicit request-response patterns using asyncio.Future or callback promises for critical path validation. Keep the bus strictly for side effects.
3. Mixing Blocking I/O in Async Handlers
Explanation: Registering a synchronous handler that performs blocking network calls (e.g., requests.get()) inside an async agent loop will stall the event loop and degrade throughput.
Fix: Use aiohttp, httpx, or asyncio.to_thread() for I/O-bound observers. Keep sync handlers lightweight and CPU-bound only.
4. Assuming Event Replay or Persistence
Explanation: The dispatcher does not store emitted events. Listeners registered after an event fires will never receive it. There is no built-in replay mechanism.
Fix: Pair the bus with a durable append-only log or database writer if historical replay is required. Implement a dedicated PersistenceHandler that writes to disk or a stream.
5. Schema Drift Between Emitter and Listener
Explanation: Because the bus accepts arbitrary dictionaries, payload structures can diverge over time. A listener expecting payload["cost"] will crash if the emitter sends payload["total_cost"].
Fix: Define shared payload contracts using Pydantic models or TypedDicts at the emission boundary. Validate shapes before calling emit().
6. Unbounded Listener Growth
Explanation: Long-running agent processes that dynamically register handlers without cleanup will accumulate memory pressure. Wildcard listeners exacerbate this if not managed.
Fix: Implement explicit unsubscribe() methods or use weak references for temporary observers. Audit listener registration in integration tests.
7. Handler Priority Conflicts
Explanation: The dispatcher executes handlers in registration order. If a cost tracker must run before a budget enforcer, registration sequence becomes a hidden dependency.
Fix: Introduce a priority integer on registration. Sort handlers by priority before iteration. Document execution order requirements in team runbooks.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Single-process agent with lightweight telemetry | In-memory event bus | Zero network overhead, sub-millisecond routing, simple deployment | Near-zero infrastructure cost |
| Multi-service architecture requiring cross-process routing | Kafka / Redis Streams | Durable storage, partitioned ordering, horizontal scaling | Higher infrastructure & operational overhead |
| Critical path validation requiring synchronous approval | Explicit callback / Future pattern | Guarantees acknowledgment before proceeding | Minimal, but increases code coupling |
| High-volume audit logging with replay requirements | Event bus + append-only writer | Decouples emission from persistence, enables replay | Moderate storage cost, high reliability |
Configuration Template
# telemetry_config.py
from typing import Any, Dict, List, Callable
import asyncio
import logging
class ProductionTelemetryRouter:
def __init__(self, max_handler_queue: int = 1000) -> None:
self._handlers: Dict[str, List[Callable]] = {}
self._async_handlers: Dict[str, List[Callable]] = {}
self._once_handlers: Dict[str, List[Callable]] = {}
self._wildcard_sync: List[Callable] = []
self._wildcard_async: List[Callable] = []
self._max_queue = max_handler_queue
self._logger = logging.getLogger("telemetry.router")
def register(self, event: str, handler: Callable, priority: int = 0, once: bool = False) -> None:
target = self._once_handlers if once else (self._async_handlers if asyncio.iscoroutinefunction(handler) else self._handlers)
target.setdefault(event, []).append((priority, handler))
target[event].sort(key=lambda x: x[0])
def register_wildcard(self, handler: Callable) -> None:
target = self._wildcard_async if asyncio.iscoroutinefunction(handler) else self._wildcard_sync
target.append(handler)
def emit(self, event: str, payload: Any) -> None:
self._dispatch_sync(self._handlers.get(event, []), event, payload)
self._dispatch_sync(self._wildcard_sync, event, payload)
self._dispatch_once(event, payload)
async def emit_async(self, event: str, payload: Any) -> None:
self.emit(event, payload)
tasks = []
for _, handler in self._async_handlers.get(event, []):
tasks.append(self._safe_async(handler, event, payload))
for handler in self._wildcard_async:
tasks.append(self._safe_async(handler, event, payload))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
def _dispatch_sync(self, handlers: List, event: str, payload: Any) -> None:
for _, handler in handlers:
try:
handler(payload)
except Exception as e:
self._logger.exception("Handler %s failed on %s: %s", handler.__name__, event, e)
def _dispatch_once(self, event: str, payload: Any) -> None:
if event in self._once_handlers:
self._dispatch_sync(self._once_handlers[event], event, payload)
del self._once_handlers[event]
async def _safe_async(self, handler: Callable, event: str, payload: Any) -> None:
try:
await handler(payload)
except Exception as e:
self._logger.exception("Async handler %s failed on %s: %s", handler.__name__, event, e)
Quick Start Guide
- Initialize the router: Instantiate
ProductionTelemetryRouter() at your application entry point. Pass it to your agent loop via dependency injection or a context manager.
- Register observers: Decorate or explicitly register handlers for events like
model_response, tool_execution, and budget_check. Use register(..., once=True) for single-shot alerts.
- Emit from the loop: Replace direct tracker calls with
await router.emit_async("event_name", payload). Keep the agent logic focused on control flow.
- Validate isolation: Force a handler exception in your test suite. Assert that the agent loop completes successfully and the dispatcher logs the fault without crashing.
- Monitor latency: Add a timing wrapper around
emit() calls. Alert if telemetry routing exceeds 5ms, indicating handler bloat or I/O blocking.