Back to KB
Difficulty
Intermediate
Read Time
7 min

El Sistema Nervioso Central: Escalando el Radar Agéntico a 24/7 con FastAPI y Webhooks

By Codcompass Team··7 min read

Scaling AI Agents for Production: A Multi-Vector Event Ingestion Pattern with FastAPI

Current Situation Analysis

AI agents designed for supply chain risk management, such as calculating the financial impact of component obsolescence, often begin as isolated Python scripts. While functional for prototyping, this approach fails under production constraints. Part Discontinuation Notices (PDNs) arrive continuously across global time zones, requiring a system that is centralized, resilient, and capable of handling heterogeneous data sources without degradation.

The primary engineering challenge lies in the mismatch between ingestion vectors and inference latency. Modern supply chain data arrives via two distinct channels:

  1. Structured SaaS APIs: Platforms like SiliconExpert and Accuris deliver standardized JSON payloads detailing market transitions.
  2. Legacy Communications: Tier 2 manufacturers and component vendors frequently issue EOL notices via plain-text emails or PDF attachments.

A common misconception is that polling legacy channels via IMAP is sufficient. IMAP polling consumes significant compute resources, introduces unpredictable latency, and complicates error handling. Furthermore, integrating Large Language Models (LLMs) directly into the request-response cycle creates a critical bottleneck. Multi-agent orchestration frameworks (e.g., CrewAI) typically require 5 to 15 seconds to parse inputs, query relational graphs, compute P&L impact, and format responses. Webhook providers often enforce strict timeout limits (e.g., 10 seconds). Holding an HTTP connection open during inference guarantees timeout errors, redundant retries, and eventual service degradation.

WOW Moment: Key Findings

Transitioning to an Event-Driven Architecture (EDA) with asynchronous decoupling resolves the latency mismatch and normalizes ingestion vectors. The following comparison highlights the operational superiority of the async webhook pattern over traditional approaches.

Ingestion StrategyIngestion LatencyLLM Blocking RiskScalabilityOperational Cost
IMAP PollingHigh (Minutes)N/ALowHigh (Network/Compute overhead)
Webhook + Sync LLMLowCritical (>10s Timeout)LowMedium (Connection exhaustion)
Webhook + Async TaskNear ZeroNoneHighLow (Resource isolation)

Why this matters: By normalizing all inputs to a unified JSON schema and decoupling ingestion from inference using background tasks, the system achieves near-zero ingestion latency. The ingestion layer can scale horizontally to handle traffic spikes, while the compute-intensive LLM layer operates independently. This pattern eliminates timeout risks and ensures no data loss due to network constraints.

Core Solution

The production architecture relies on three pillars: input normalization, asynchronous routing, and decoupled execution.

1. Input Normalization via Inbound Parse Gateways

To eliminate IMAP polling, leverage Inbound Parse services provided by transactional email providers (e.g., SendGrid, Mailgun). These services intercept incoming emails, extract headers and body text, and forward a standardized JSON payload to a webhook endpoint. This transforms legacy email vectors into the same HTTPS POST format used by SaaS APIs.

2. Unified Event Schema

Define a single Pydantic model that accepts payloads from both SaaS and email sources. This simplifies routing logic and ensures consistent validation.

3. Asynchronous Execution with FastAPI

Use FastAPI's BackgroundTasks to enqueue inference jobs immediately. The endpoint returns an HTTP 202 Accepted response, closing the connection while a worker processes the LLM workflow in the background.

Implementation

The following TypeScript-style logic is implemented in Python using FastAPI. Note the use of a unified schema, immediate acknowledgment, and structured error handling within the background task.

import logging
from fastapi import FastAPI, BackgroundTasks, HTTPException, Request
from pydantic import BaseModel, Field
from typing import Literal, Any
import uuid

app = FastAPI(title="Obsolescence Radar Ingestion")
logger = logging.getLogger(__name__)

# Unified Event Schema
class IngestionEvent(BaseModel):
    trace_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    source: Literal["saas_platform", "email_gateway"]
    raw_data: dict[str, Any]

# Mock Inference Engine
async def execute_obsolescence_audit(event_data: dict) -> dict:
    """
    Simulates CrewAI multi-agent workflow:
    - Parse MPN/Manufacturer
    - Query Supabase relational graph
    - Calculate P&L impact
    - Format mitigation report
    """
    # Simulate 5-15s latency
    import asyncio
    await asyncio.sleep(8)
    return {
        "status": "completed",
        "risk_score": 0.85,
        "mitigation": "Qualify alternative supplier immediately."
    }

# Mock Notification Service
async def dispatch_risk_alert(report: dict) -> None:
    """Sends formatted alert to MS Teams or Slack."""
    logger.info(f"Dispatching alert: {report.get('mitigation')}")

# Background Task Definition
async def process_audit_task(event: IngestionEvent) -> None:
    """
    Decoupled worker. Handles inference and notification.
    Errors are caught to prevent silent failures.
    """
    try:
        logger.info

(f"Starting audit for trace_id={event.trace_id}") result = await execute_obsolescence_audit(event.raw_data) await dispatch_risk_alert(result) logger.info(f"Audit completed for trace_id={event.trace_id}") except Exception as e: logger.error(f"Audit failed for trace_id={event.trace_id}: {e}") # In production, push to Dead Letter Queue here

Unified Ingestion Endpoint

@app.post("/v1/events/ingest", status_code=202) async def ingest_event( event: IngestionEvent, background_tasks: BackgroundTasks, request: Request ): # Security: Validate origin or API key in production # if not validate_auth(request): raise HTTPException(401)

# Enqueue immediately; do not await inference
background_tasks.add_task(process_audit_task, event)

return {
    "status": "accepted",
    "trace_id": event.trace_id,
    "message": "Event queued for asynchronous processing"
}

#### Architecture Rationale
*   **Unified Endpoint:** A single `/v1/events/ingest` route reduces surface area and simplifies webhook configuration for both SaaS platforms and email gateways.
*   **HTTP 202 Accepted:** Explicitly signals that the request has been accepted for processing but is not yet complete. This is the standard for async operations.
*   **Traceability:** Every event carries a `trace_id`. This enables correlation between ingestion, inference, and notification logs, which is critical for debugging production issues.
*   **Error Containment:** The background task wraps inference in a `try/except` block. Without this, an unhandled exception in the LLM workflow could crash the worker process or result in silent data loss.

### Pitfall Guide

| Pitfall | Explanation | Fix |
| :--- | :--- | :--- |
| **Synchronous LLM Calls** | Calling the inference engine directly in the endpoint handler blocks the event loop. With 5-15s latency, this triggers webhook timeouts and connection exhaustion. | Always use `BackgroundTasks` or a message broker (Celery/RabbitMQ) to decouple ingestion from inference. |
| **IMAP Polling Overhead** | Polling email servers via IMAP requires maintaining persistent connections, handling rate limits, and parsing raw MIME structures. This adds latency and operational complexity. | Use Inbound Parse webhooks to normalize emails to JSON. This offloads parsing to the provider and reduces latency to near-zero. |
| **Silent Background Failures** | If a background task raises an exception, FastAPI may log it, but the caller never knows. Errors can go unnoticed, causing data gaps. | Implement structured logging with `trace_id`. Add retry logic and Dead Letter Queues (DLQ) for failed tasks. Monitor task completion rates. |
| **Idempotency Violations** | Webhook providers retry failed deliveries. Without idempotency, the same PDN may be processed multiple times, generating duplicate alerts and skewing metrics. | Check the `trace_id` or a unique event key against a processed set before executing inference. Return `202` immediately even for duplicates. |
| **Event Loop Blocking** | Using synchronous libraries (e.g., `requests`, `pymysql`) inside an async FastAPI endpoint or task blocks the entire event loop, degrading throughput for all requests. | Use async-compatible libraries (`httpx`, `asyncpg`) or run sync code in an executor (`asyncio.to_thread`). |
| **Schema Drift** | SaaS platforms may update their JSON structure. If the ingestion schema is rigid, valid events may be rejected with `422 Unprocessable Entity`. | Use flexible validation with `extra="allow"` in Pydantic models during ingestion, or implement versioned endpoints. Validate critical fields explicitly. |
| **Resource Starvation** | Background tasks share the same process memory. A surge in events can exhaust memory or CPU, causing the ingestion endpoint to become unresponsive. | Monitor worker resource usage. For high volume, offload tasks to a dedicated worker pool using Celery or a cloud queue service. |

### Production Bundle

#### Action Checklist
- [ ] **Normalize Inputs:** Configure SendGrid/Mailgun Inbound Parse to forward emails to the ingestion webhook. Verify JSON structure matches `IngestionEvent`.
- [ ] **Implement Idempotency:** Add a check in `process_audit_task` to verify `trace_id` hasn't been processed. Use Redis or a database unique constraint.
- [ ] **Secure Endpoints:** Add authentication middleware (API keys or JWT) to the `/v1/events/ingest` endpoint to prevent unauthorized submissions.
- [ ] **Add Structured Logging:** Ensure all logs include `trace_id`, `source`, and `status`. Integrate with a log aggregation tool (e.g., Datadog, ELK).
- [ ] **Configure Timeouts:** Set webhook provider retry policies appropriately. Ensure the background task has a timeout mechanism to prevent runaway processes.
- [ ] **Monitor Queue Depth:** Track the number of pending background tasks. Alert if the queue grows beyond a threshold, indicating inference bottlenecks.
- [ ] **Test Failure Modes:** Simulate LLM timeouts and network errors. Verify that errors are logged, retries occur, and the ingestion endpoint remains responsive.

#### Decision Matrix

| Scenario | Recommended Approach | Why | Cost Impact |
| :--- | :--- | :--- | :--- |
| **Low Volume (<100 events/day)** | FastAPI `BackgroundTasks` | Simple implementation, no external dependencies. | Low (Compute only) |
| **High Volume (>1k events/day)** | Celery + Redis/RabbitMQ | Persistent queues, horizontal scaling, advanced retry logic. | Medium (Infra + Ops) |
| **Legacy Email Sources** | Inbound Parse Webhook | Normalizes format, eliminates polling, reduces latency. | Low (SaaS fee) |
| **Strict Compliance/Audit** | Event Sourcing + DLQ | Full audit trail, guaranteed delivery, replay capability. | High (Storage + Complexity) |

#### Configuration Template

A production-ready `main.py` template with environment configuration and structured logging.

```python
# main.py
import os
import logging
from fastapi import FastAPI, BackgroundTasks
from pydantic_settings import BaseSettings
from pydantic import BaseModel, Field

class Settings(BaseSettings):
    app_name: str = "Obsolescence Radar"
    log_level: str = "INFO"
    slack_webhook_url: str = ""
    
    class Config:
        env_file = ".env"

settings = Settings()

# Structured Logging
logging.basicConfig(
    level=getattr(logging, settings.log_level),
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

app = FastAPI(title=settings.app_name)

class IngestionEvent(BaseModel):
    trace_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    source: str
    raw_data: dict

async def run_audit(event: IngestionEvent):
    logger.info(f"Processing {event.trace_id} from {event.source}")
    # Inference logic here
    logger.info(f"Completed {event.trace_id}")

@app.post("/v1/events/ingest", status_code=202)
async def ingest(event: IngestionEvent, background_tasks: BackgroundTasks):
    background_tasks.add_task(run_audit, event)
    return {"status": "accepted", "trace_id": event.trace_id}

Quick Start Guide

  1. Define the Schema: Create the IngestionEvent model with required fields (trace_id, source, raw_data). Ensure it validates inputs from both SaaS and email gateways.
  2. Setup Inbound Parse: In your email provider (SendGrid/Mailgun), configure an Inbound Parse webhook pointing to your FastAPI /v1/events/ingest URL. Test with a sample email to verify JSON delivery.
  3. Deploy the Service: Containerize the FastAPI application using Docker. Deploy to a cloud provider (AWS, GCP, Azure) or a PaaS (Render, Railway). Ensure environment variables are configured.
  4. Connect SaaS Sources: Configure SiliconExpert, Accuris, or other SaaS platforms to send webhooks to the same /v1/events/ingest endpoint. Verify payload compatibility.
  5. Verify Async Processing: Trigger a test event. Confirm the endpoint returns 202 Accepted immediately. Check logs to ensure the background task executes and completes without blocking the ingestion layer.