Back to KB
Difficulty
Intermediate
Read Time
9 min

agent-event-bus: Sync and Async Pub/Sub for Agent Events

By Codcompass Team··9 min read

Decoupling Agent Observability: An In-Memory Event Bus Architecture

Current Situation Analysis

Modern AI agent loops are no longer simple request-response cycles. They are iterative control flows that invoke models, execute tools, manage state, and enforce constraints. As these loops grow in complexity, developers routinely inject cross-cutting concerns directly into the execution path: cost tracking, latency monitoring, audit logging, budget enforcement, and alerting hooks.

The industry pain point is architectural coupling. When observability code is wired synchronously into the agent loop, it gains unintended control-plane authority. A network timeout to a metrics aggregator, a schema mismatch in a logging payload, or a simple NoneType error in a budget tracker can terminate the entire agent session. The agent stops not because the model failed or the logic broke, but because a secondary observer panicked.

This problem is frequently overlooked because observability is traditionally treated as a first-class citizen in the execution path. Engineers assume that if a metric fails, the system should fail fast. In reality, observability is a guest. The agent loop is the host. When the guest trips over a chair, the party should not end.

Technical evidence from production agent stacks confirms this fragility. In-memory event routing patterns demonstrate that isolating telemetry from control flow reduces failure blast radius by eliminating synchronous dependencies. Libraries built around this pattern (such as the zero-dependency Python 3.9+ event bus implementations) enforce error isolation at the dispatcher level: each handler executes inside its own exception boundary. If a handler raises, the dispatcher logs the fault, continues to the next subscriber, and returns control to the agent loop unharmed. This design choice transforms observability from a potential single point of failure into a resilient, pluggable layer.

WOW Moment: Key Findings

The architectural shift from direct injection to event-driven decoupling fundamentally changes how agent systems handle failure, deployment velocity, and testing complexity. The following comparison highlights the operational impact of adopting an in-memory pub/sub routing layer for agent telemetry.

ApproachFailure Blast RadiusHandler IsolationCross-Cutting CouplingAsync Compatibility
Direct InjectionHigh (observer crash halts agent)None (exceptions propagate)Tight (loop imports every tracker)Manual threading/asyncio wrapping
Event Bus RoutingLow (observer crash logged, agent continues)Full (try/except per handler)Loose (loop only emits events)Native (sync/async coexist on same bus)

This finding matters because it decouples system availability from monitoring infrastructure. When telemetry handlers are isolated, you can deploy updates to cost trackers, swap logging backends, or attach new audit hooks without touching the agent control flow. The agent loop remains stable regardless of observer health. This enables zero-downtime observability rollouts, simplifies integration testing, and ensures that external service failures (e.g., a down PagerDuty endpoint or a throttled metrics API) never cascade into agent termination.

Core Solution

Implementing an in-memory event bus for agent telemetry requires a clear separation between emission (control flow) and subscription (observability). The architecture relies on three core mechanisms: dispatcher routing, handler isolation, and async/sync coexistence.

Step 1: Define the Dispatcher Interface

The bus acts as a central registry. It maintains mappings of event names to handler lists, supports wildcard routing, and manages one-time subscriptions.

from typing import Any, Callable, Dict, List, Optional
import asyncio
import logging

logger = logging.getLogger(__name__)

class TelemetryRouter:
    def __init__(self) -> None:
        self._sync_handlers: Dict[str, List[Callable]] = {}
        self._async_handlers: Dict[str, List[Callable]] = {}
        self._once_handlers: Dict[str, List[Callable]] = {}
        self._wildcard_sync: List[Callable] = []
        self._wildcard_

🎉 Mid-Year Sale — Unlock Full Article

Base plan from just $4.99/mo or $49/yr

Sign in to read the full article and unlock all 635+ tutorials.

Sign In / Register — Start Free Trial

7-day free trial · Cancel anytime · 30-day money-back