port asyncio
import httpx
from typing import Dict, Any
class ExternalDataFetcher:
def init(self, base_url: str, timeout: float = 5.0):
self.base_url = base_url
self.timeout = httpx.Timeout(timeout)
async def fetch_metrics(self, endpoint: str) -> Dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/{endpoint}")
response.raise_for_status()
return response.json()
**Architecture Rationale**: We use `httpx` instead of `requests` because synchronous HTTP clients block the event loop. `httpx.AsyncClient` manages connection pooling and non-blocking socket I/O. The `async with` context manager ensures the client closes cleanly, preventing file descriptor leaks in long-running services.
### Step 2: Compose Concurrent Operations
Sequential awaits defeat the purpose of async. When multiple independent I/O operations exist, they should run concurrently using `asyncio.gather()`.
```python
class AnalyticsAggregator:
def __init__(self, fetcher: ExternalDataFetcher):
self.fetcher = fetcher
async def compile_dashboard(self) -> Dict[str, Any]:
tasks = [
self.fetcher.fetch_metrics("user_activity"),
self.fetcher.fetch_metrics("revenue_stream"),
self.fetcher.fetch_metrics("system_health")
]
results = await asyncio.gather(*tasks, return_exceptions=True)
compiled = {}
for endpoint, result in zip(["activity", "revenue", "health"], results):
if isinstance(result, Exception):
compiled[endpoint] = {"status": "error", "detail": str(result)}
else:
compiled[endpoint] = {"status": "ok", "data": result}
return compiled
Architecture Rationale: asyncio.gather() schedules all coroutines concurrently and waits for completion. The return_exceptions=True flag prevents one failed request from cancelling the entire batch, which is critical for dashboard aggregation where partial data is preferable to total failure.
Step 3: Integrate Type Hints and Decorators
Type hints improve static analysis and IDE support, especially in async codebases where coroutine objects can be accidentally returned instead of awaited. Decorators must be async-aware to avoid wrapping coroutines incorrectly.
import time
from functools import wraps
from typing import Callable, Awaitable, Any
def measure_latency(func: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
elapsed = time.perf_counter() - start
print(f"[{func.__name__}] completed in {elapsed:.3f}s")
return wrapper
class PaymentProcessor:
@measure_latency
async def validate_transaction(self, tx_id: str) -> bool:
await asyncio.sleep(0.8) # Simulated external validation
return True
Architecture Rationale: The decorator uses @wraps to preserve metadata and declares an async def wrapper. Synchronous decorators wrapping async functions break the coroutine chain. time.perf_counter() provides monotonic timing unaffected by system clock updates, which is essential for accurate latency measurement in production.
Step 4: Bootstrap the Event Loop
The entry point must initialize the loop and manage lifecycle events. In modern Python (3.7+), asyncio.run() handles loop creation, execution, and cleanup automatically.
async def main() -> None:
fetcher = ExternalDataFetcher(base_url="https://api.internal.metrics")
aggregator = AnalyticsAggregator(fetcher)
dashboard = await aggregator.compile_dashboard()
print(dashboard)
if __name__ == "__main__":
asyncio.run(main())
Architecture Rationale: asyncio.run() is the standard entry point for scripts and CLI tools. In framework contexts (FastAPI, Quart), the loop is managed by the server, and you only define async route handlers. Mixing asyncio.run() inside an already-running loop raises a RuntimeError, so context awareness is mandatory.
Pitfall Guide
1. Mixing Synchronous I/O Libraries in Async Code
Explanation: Importing requests, psycopg2, or pymongo into an async function blocks the entire event loop. The loop cannot yield control while the synchronous library waits for network or disk I/O.
Fix: Replace with async-native alternatives (httpx, asyncpg, motor). If a sync library is unavoidable, offload it to a thread pool using asyncio.to_thread() or loop.run_in_executor().
2. Forgetting the await Keyword
Explanation: Calling an async function without await returns a coroutine object instead of executing it. The code appears to run but silently skips the operation, leading to missing data or unhandled states.
Fix: Enable strict linting with flake8-async or pyright. Use type checkers that flag unawaited coroutines. Add runtime assertions in development: assert inspect.isawaitable(result).
3. CPU-Heavy Work Inside Coroutines
Explanation: Async does not parallelize CPU-bound tasks. Long-running computations (image processing, heavy math, JSON parsing of massive payloads) block the event loop, starving other coroutines.
Fix: Offload CPU work to asyncio.to_thread() or use multiprocessing. Keep async functions focused on I/O scheduling and lightweight data transformation.
4. Nested Event Loop Execution
Explanation: Calling asyncio.run() or asyncio.new_event_loop() inside an async context raises RuntimeError: This event loop is already running. This commonly occurs when testing async code or integrating with legacy sync wrappers.
Fix: Use nest_asyncio for interactive environments (Jupyter, REPL) sparingly. In production, rely on framework-managed loops or use asyncio.get_running_loop() to schedule tasks without creating new loops.
5. Unhandled Exception Propagation in gather()
Explanation: By default, asyncio.gather() cancels remaining tasks if one raises an exception. This can cause cascading failures in batch operations where partial success is acceptable.
Fix: Pass return_exceptions=True and inspect results manually. Alternatively, wrap individual tasks in asyncio.Task with custom error handlers, or use asyncio.wait() with FIRST_EXCEPTION/ALL_COMPLETED flags.
6. Missing Timeouts on External Calls
Explanation: Async coroutines waiting on unresponsive endpoints hang indefinitely. Without explicit timeouts, connection pools exhaust, and the service becomes unresponsive.
Fix: Configure timeouts at the client level (httpx.Timeout, asyncpg.connect(..., timeout=...)). Wrap critical paths with asyncio.wait_for(coro, timeout=...) to enforce hard limits.
7. Blocking Decorators and Middleware
Explanation: Synchronous decorators wrapping async functions execute in the main thread before the coroutine is scheduled. If the decorator performs I/O or heavy computation, it blocks the loop.
Fix: Ensure all decorators, middleware, and context managers are async-compatible. Use @wraps and async def wrappers. Validate with inspect.iscoroutinefunction() during development.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-concurrency API calls, DB queries, webhooks | Async/Await with event loop | Maximizes throughput per worker, reduces idle CPU time | Lowers compute costs by 40-70% vs sync workers |
| Image processing, ML inference, data transformation | Multiprocessing or dedicated GPU workers | Bypasses GIL, utilizes multiple cores efficiently | Higher infrastructure cost, but necessary for CPU/GPU bounds |
| Legacy sync libraries with no async alternative | asyncio.to_thread() or run_in_executor() | Prevents event loop blocking while maintaining async architecture | Minimal cost, slight overhead from thread context switching |
| Real-time WebSocket or streaming data | Async with backpressure handling | Maintains connection state, processes streams without blocking | Requires careful memory management, moderate infra cost |
Configuration Template
# pyproject.toml
[project]
name = "async-backend-service"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
"httpx>=0.27.0",
"asyncpg>=0.29.0",
"aioredis>=2.0.1",
"pydantic>=2.6.0",
"structlog>=24.1.0"
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.3.0",
"pyright>=1.1.350"
]
[tool.ruff]
target-version = "py310"
select = ["E", "F", "W", "ASYNC"]
[tool.pyright]
pythonVersion = "3.10"
typeCheckingMode = "strict"
# main.py
import asyncio
import structlog
from httpx import AsyncClient, Timeout
from asyncpg import create_pool
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
async def init_db_pool() -> None:
global db_pool
db_pool = await create_pool(
dsn="postgresql://user:pass@localhost/db",
min_size=5,
max_size=20,
command_timeout=60.0
)
logger.info("Database pool initialized")
async def shutdown() -> None:
await db_pool.close()
logger.info("Database pool closed")
async def main() -> None:
await init_db_pool()
try:
# Application logic here
await asyncio.Event().wait() # Keep loop alive for demo
except KeyboardInterrupt:
pass
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Quick Start Guide
- Initialize Project: Run
uv init async-service && cd async-service and add dependencies: uv add httpx asyncpg structlog pytest pytest-asyncio.
- Create Async Entry Point: Write a
main.py with async def main() and asyncio.run(main()). Configure structured logging and connection pools.
- Define I/O Coroutines: Implement async functions using
httpx.AsyncClient or asyncpg. Wrap external calls with explicit timeouts and error handling.
- Compose & Test: Use
asyncio.gather() for concurrent operations. Write tests with @pytest.mark.asyncio and verify non-blocking behavior under load.
- Deploy: Containerize with
Dockerfile using python:3.11-slim. Run with uvicorn or gunicorn (async worker class) for production routing. Monitor loop latency and connection pool metrics.