nment. The stack requires FastAPI for routing, httpx for async HTTP dispatch, pydantic for schema validation, and openai for model orchestration.
mkdir llm-metering-service
cd llm-metering-service
python -m venv .venv
source .venv/bin/activate
pip install fastapi uvicorn openai httpx python-dotenv pydantic
Step 2: LLM Orchestration Layer
Replace inline model calls with a structured router. This example uses a dedicated InferenceRouter class to manage model selection, token limits, and response parsing.
import os
import uuid
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
class CompletionPayload(BaseModel):
query: str
target_model: str = Field(default="gpt-4o-mini", description="OpenAI model identifier")
max_output_tokens: int = Field(default=512, ge=64, le=4096)
class InferenceResult(BaseModel):
response_text: str
model_used: str
input_tokens: int
output_tokens: int
request_id: str
class InferenceRouter:
def __init__(self):
self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
async def execute(self, payload: CompletionPayload) -> InferenceResult:
request_id = str(uuid.uuid4())
response = await self.client.chat.completions.create(
model=payload.target_model,
messages=[{"role": "user", "content": payload.query}],
max_tokens=payload.max_output_tokens,
)
return InferenceResult(
response_text=response.choices[0].message.content,
model_used=response.model,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
request_id=request_id,
)
Step 3: Identity Resolution and Request Routing
Authentication must resolve the caller before inference begins. Use FastAPI dependency injection to extract credentials and attach a tenant identifier to the request context.
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Optional
TENANT_REGISTRY = {
"tk_prod_alpha99": {"tenant_id": "org_7712", "tier": "enterprise"},
"tk_dev_beta44": {"tenant_id": "org_8831", "tier": "developer"},
}
def resolve_tenant(x_api_token: Optional[str] = Header(None)) -> dict:
if not x_api_token:
raise HTTPException(status_code=401, detail="Authorization token missing")
tenant = TENANT_REGISTRY.get(x_api_token)
if not tenant:
raise HTTPException(status_code=401, detail="Invalid token")
return tenant
app = FastAPI(title="AI Inference Gateway")
router = InferenceRouter()
@app.post("/v1/completions", response_model=InferenceResult)
async def handle_completion(
payload: CompletionPayload,
tenant: dict = Depends(resolve_tenant)
):
result = await router.execute(payload)
# Metering dispatch happens here
return result
Step 4: Async Event Emission Pipeline
Billing events must never block the inference response. Implement a fire-and-forget dispatcher that formats usage data into CloudEvents and pushes it to the Kong Konnect Metering & Billing API.
import httpx
import asyncio
from datetime import datetime, timezone
class UsageEmitter:
def __init__(self):
self.base_url = os.getenv("KONNECT_API_URL")
self.auth_header = {"Authorization": f"Bearer {os.getenv('KONNECT_TOKEN')}"}
self.client = httpx.AsyncClient(timeout=5.0)
async def dispatch(self, tenant_id: str, inference: InferenceResult):
event_payload = {
"specversion": "1.0",
"type": "ai.usage.token",
"source": "inference-gateway",
"id": inference.request_id,
"time": datetime.now(timezone.utc).isoformat(),
"subject": tenant_id,
"data": {
"meter": "token_consumption",
"subject": tenant_id,
"value": inference.input_tokens + inference.output_tokens,
"metadata": {
"model": inference.model_used,
"input_tokens": inference.input_tokens,
"output_tokens": inference.output_tokens
}
}
}
try:
await self.client.post(f"{self.base_url}/events", json=event_payload, headers=self.auth_header)
except Exception as e:
# Log to observability pipeline; do not raise to inference layer
print(f"[METERING] Dispatch failed for {inference.request_id}: {e}")
emitter = UsageEmitter()
@app.post("/v1/completions", response_model=InferenceResult)
async def handle_completion(
payload: CompletionPayload,
tenant: dict = Depends(resolve_tenant)
):
result = await router.execute(payload)
asyncio.create_task(emitter.dispatch(tenant["tenant_id"], result))
return result
Architecture Decisions and Rationale
- Async Event Dispatch:
asyncio.create_task() ensures the HTTP response returns immediately after inference. Billing network latency is absorbed in the background.
- CloudEvents Standard: Kong Konnect's OpenMeter engine natively consumes CloudEvents. Using this CNCF specification guarantees compatibility with future metering backends without rewriting payload structures.
- Externalized Rate Cards: Pricing logic is deliberately excluded from the application code. Rate cards, tiers, and currency conversions are configured in the Konnect dashboard. This prevents code deployments from triggering billing changes.
- Token Directionality: The metadata object separates
input_tokens and output_tokens. Providers charge these at different rates. Storing them separately enables precise cost allocation later.
Pitfall Guide
1. Synchronous Metering Blocking Inference
Explanation: Awaiting the billing API call inside the request handler adds 100-300ms of latency to every LLM response.
Fix: Always dispatch metering events asynchronously. Use asyncio.create_task() or a message queue (Redis Streams, RabbitMQ) to decouple inference from billing.
Explanation: Aggregating total tokens into a single counter prevents accurate cost calculation. Input tokens are typically cheaper than output tokens.
Fix: Store input_tokens and output_tokens as distinct fields in the event metadata. Configure separate meters or weighted aggregation rules in the billing platform.
3. Hardcoding Pricing Logic in Application Code
Explanation: Embedding rate calculations (if model == "gpt-4o": cost = x) requires code deployments for every pricing change and creates drift between your app and the billing platform.
Fix: Treat the application as a pure event producer. Define all rate cards, tiers, and currency rules in the Konnect Metering & Billing dashboard. The app only reports consumption.
4. Missing Idempotency Keys
Explanation: Network retries or client resubmissions can duplicate events, inflating customer invoices and triggering disputes.
Fix: Generate a unique request_id per inference call and map it to the CloudEvent id field. The OpenMeter engine uses this for automatic deduplication within the aggregation window.
Explanation: Querying the LLM provider's usage endpoint separately after the response adds unnecessary API calls and latency.
Fix: Extract token counts directly from the chat completion response object. OpenAI and compatible providers include usage metadata in the same payload as the generated text.
6. Timezone and Window Misalignment
Explanation: Aggregating events across mismatched timezones causes billing cycles to split incorrectly, resulting in partial invoices or double-charging.
Fix: Standardize all event timestamps to UTC. Configure billing windows explicitly in the metering platform (e.g., monthly starting on the 1st). Never rely on local server time.
7. Unhandled Dispatch Failures
Explanation: If the billing API returns 5xx errors or times out, silent drops lead to revenue leakage.
Fix: Implement a retry queue with exponential backoff. Log failed dispatches to your observability stack (Datadog, Prometheus). Set up alerts for sustained metering failure rates above 0.5%.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Early-stage AI startup with <10k monthly requests | Kong Konnect Metering & Billing (Cloud) | Zero infrastructure overhead, managed aggregation, instant invoicing | Low fixed cost, scales with usage |
| Regulated enterprise requiring data residency | Self-hosted OpenMeter | Full control over event storage, compliance with GDPR/HIPAA data boundaries | Higher engineering cost, predictable infrastructure spend |
| Existing Stripe Billing integration | Stripe Usage Records API | Leverages existing payment rails, avoids platform migration | Moderate integration effort, familiar billing UX |
| Multi-model routing with dynamic pricing | Kong Konnect + External Rate Config | Decouples pricing from code, supports real-time rate updates | Minimal dev overhead, maximizes margin accuracy |
Configuration Template
# .env
OPENAI_API_KEY=sk-proj-your-key
KONNECT_API_URL=https://us.api.konghq.com/v3/openmeter
KONNECT_TOKEN=kpat_your-personal-access-token
LOG_LEVEL=INFO
METERING_TIMEOUT_MS=5000
# config.py
import os
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
openai_key: str = os.getenv("OPENAI_API_KEY", "")
konnect_url: str = os.getenv("KONNECT_API_URL", "")
konnect_token: str = os.getenv("KONNECT_TOKEN", "")
metering_timeout: int = int(os.getenv("METERING_TIMEOUT_MS", "5000"))
class Config:
env_file = ".env"
settings = AppSettings()
Quick Start Guide
- Initialize Environment: Run
python -m venv .venv && source .venv/bin/activate, then install dependencies via pip install fastapi uvicorn openai httpx python-dotenv pydantic.
- Configure Credentials: Populate
.env with your OpenAI key and Kong Konnect PAT. Ensure the PAT has metering:write permissions.
- Launch Gateway: Execute
uvicorn main:app --reload --port 8000. The server will start with async inference and background metering dispatch.
- Validate Flow: Send a test request:
curl -X POST http://localhost:8000/v1/completions -H "Content-Type: application/json" -H "x-api-token: tk_prod_alpha99" -d '{"query": "Explain quantum entanglement"}'. Verify the response returns immediately and check the Kong Konnect dashboard for the ingested CloudEvent.