pt_count: int = 0
max_attempts: int = 3
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
### 2. Database Adapter & Idempotency Guard
Idempotency is a database contract, not an application check. A unique constraint on the request fingerprint prevents duplicate execution, while `SELECT ... FOR UPDATE SKIP LOCKED` enables safe concurrent worker claiming.
```python
from sqlalchemy import Column, String, Integer, DateTime, Enum as SAEnum, UniqueConstraint
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class JobEntity(Base):
__tablename__ = "job_executions"
id = Column(String, primary_key=True)
fingerprint = Column(String, nullable=False)
payload = Column(String, nullable=False)
status = Column(SAEnum(ExecutionStatus), nullable=False, default=ExecutionStatus.PENDING)
attempt_count = Column(Integer, default=0)
max_attempts = Column(Integer, default=3)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
__table_args__ = (
UniqueConstraint("fingerprint", name="uq_job_fingerprint"),
)
async def claim_next_job(session: AsyncSession) -> JobEntity | None:
query = (
select(JobEntity)
.where(JobEntity.status == ExecutionStatus.PENDING)
.with_for_update(skip_locked=True)
.order_by(JobEntity.created_at)
.limit(1)
)
result = await session.execute(query)
return result.scalars().first()
3. Application Service & Dependency Injection
FastAPI's Depends() mechanism decouples resource acquisition from handler logic. This improves testability and enforces explicit lifecycle management.
from fastapi import Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db_session() -> AsyncSession:
async with async_session_factory() as session:
yield session
class JobOrchestrator:
def __init__(self, db: AsyncSession):
self.db = db
async def submit_job(self, fingerprint: str, payload: dict) -> JobRecord:
try:
entity = JobEntity(
fingerprint=fingerprint,
payload=json.dumps(payload),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return JobRecord.model_validate(entity)
except IntegrityError:
await self.db.rollback()
existing = await self.db.execute(
select(JobEntity).where(JobEntity.fingerprint == fingerprint)
)
record = existing.scalars().first()
if record.payload != json.dumps(payload):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Fingerprint conflict")
return JobRecord.model_validate(record)
async def transition_state(self, job_id: str, new_status: ExecutionStatus) -> None:
job = await self.db.get(JobEntity, job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
valid_transitions = {
ExecutionStatus.PENDING: [ExecutionStatus.PROCESSING],
ExecutionStatus.PROCESSING: [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED],
ExecutionStatus.FAILED: [ExecutionStatus.PENDING, ExecutionStatus.DEAD_LETTER]
}
if new_status not in valid_transitions.get(job.status, []):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Invalid state transition")
job.status = new_status
job.updated_at = datetime.now(timezone.utc)
await self.db.commit()
4. Worker Loop & Cooperative Scheduling
Python's async model is cooperative. Coroutines yield control only at await boundaries. CPU-bound operations must be offloaded to prevent event loop starvation.
import asyncio
from opentelemetry import trace
tracer = trace.get_tracer("job.worker")
async def process_worker_loop(session_factory, worker_id: str):
while True:
async with session_factory() as session:
job = await claim_next_job(session)
if not job:
await asyncio.sleep(1)
continue
with tracer.start_as_current_span(f"process.{job.id}"):
await session.execute(
update(JobEntity)
.where(JobEntity.id == job.id)
.values(status=ExecutionStatus.PROCESSING, attempt_count=job.attempt_count + 1)
)
await session.commit()
try:
# Simulate I/O bound work
await asyncio.sleep(0.2)
await session.execute(
update(JobEntity).where(JobEntity.id == job.id).values(status=ExecutionStatus.COMPLETED)
)
except Exception:
new_status = ExecutionStatus.DEAD_LETTER if job.attempt_count >= job.max_attempts else ExecutionStatus.FAILED
await session.execute(
update(JobEntity).where(JobEntity.id == job.id).values(status=new_status)
)
await session.commit()
await asyncio.sleep(0.1)
Architecture Rationale:
- Hexagonal Boundaries: Models, persistence, and orchestration are isolated. HTTP handlers only validate input and delegate to
JobOrchestrator.
- Database-First Idempotency: Unique constraints handle race conditions atomically. Application-level checks are redundant and prone to TOCTOU bugs.
- Explicit State Guards: Transition matrices prevent logical corruption. The database enum type adds a secondary enforcement layer.
- Cooperative Yielding:
asyncio.sleep() and I/O operations naturally yield. CPU-heavy tasks require run_in_executor to avoid blocking the loop.
Pitfall Guide
1. Blocking the Event Loop with Synchronous Code
Explanation: Python's asyncio relies on explicit yielding. Calling synchronous database drivers, heavy JSON parsing, or CPU-intensive algorithms inside an async handler stalls the entire event loop, causing cascading timeouts.
Fix: Wrap blocking operations in asyncio.to_thread() or loop.run_in_executor(). Prefer async-native libraries (asyncpg, httpx, SQLAlchemy 2.0 async) for all I/O paths.
2. Underprovisioning Connection Pools
Explanation: Default pool sizes (often 5-10) assume low concurrency. Under load, requests queue waiting for database connections, inflating p99 latency. This is a resource topology issue, not a language limitation.
Fix: Align pool_size and max_overflow with expected concurrency. For production, deploy PgBouncer in transaction mode to multiplex connections efficiently and reduce Postgres process overhead.
3. Treating Type Hints as Optional Documentation
Explanation: Python type hints are ignored at runtime by default. Without strict static analysis, type mismatches, missing attributes, and incorrect payloads surface only in production.
Fix: Enforce mypy --strict in CI. Pair with ruff for linting and Pydantic for runtime validation. Treat type errors as build failures.
4. Implementing Idempotency in Application Logic
Explanation: Checking for existing records before insertion creates a race condition. Two concurrent requests can both pass the existence check and insert duplicates.
Fix: Rely on database unique constraints. Catch IntegrityError on conflict, then re-read the winning row. The database is the single source of truth for concurrency control.
5. Neglecting Structured Telemetry Early
Explanation: Adding logging and metrics after feature completion leads to inconsistent formats, missing context, and difficult debugging. Python tutorials frequently omit observability, creating production blind spots.
Fix: Initialize OpenTelemetry SDKs and structured loggers (structlog) at application startup. Propagate trace IDs through HTTP headers and database spans. Treat telemetry as a core dependency, not an addon.
6. Manual Migration Drift
Explanation: Hand-writing SQL migration scripts leads to schema drift, especially in fast-moving teams. Manual diffs are error-prone and slow down deployment cycles.
Fix: Use Alembic with autogeneration enabled. Diff ORM models against the live schema, review the generated SQL, and apply. This mirrors the safety of compiled schema validation without sacrificing velocity.
7. Forcing Compiled-Language Patterns onto an Interpreted Runtime
Explanation: Attempting to replicate Go's explicit error handling, interface-heavy DI, or manual memory management in Python creates verbose, unidiomatic code that fights the ecosystem.
Fix: Embrace Python idioms: Depends() for injection, context managers for resource lifecycle, exceptions for control flow, and Pydantic for validation. Write Python, not Go with different syntax.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-concurrency I/O bound service | Async FastAPI + asyncpg + PgBouncer | Maximizes throughput without thread overhead; proxy handles connection pooling efficiently | Low infrastructure cost; moderate dev complexity |
| CPU-intensive data processing | Sync worker pool + Celery/RQ + message broker | Prevents event loop blocking; horizontal scaling via queue workers | Higher infrastructure cost; requires broker maintenance |
| Strict compliance/audit requirements | Database-enforced state machines + Alembic + OpenTelemetry | Immutable audit trail; schema versioning; traceable execution paths | Moderate dev cost; high operational reliability |
| Rapid prototyping/MVP | Pydantic + FastAPI + SQLite/Postgres + manual migrations | Fast iteration; minimal boilerplate; easy to refactor later | Low initial cost; requires refactoring before production scale |
Configuration Template
# config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field
class ServiceConfig(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
database_url: str = Field(..., description="Async Postgres DSN")
pool_size: int = Field(20, ge=5, le=100)
max_overflow: int = Field(10, ge=0)
worker_concurrency: int = Field(4, ge=1)
otel_service_name: str = Field("task-queue-service")
log_level: str = Field("INFO")
# main.py
import structlog
from fastapi import FastAPI
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from config import ServiceConfig
from infrastructure.db import create_async_engine, async_session_factory
from workers.executor import process_worker_loop
import asyncio
config = ServiceConfig()
structlog.configure(
processors=[structlog.processors.JSONRenderer()],
wrapper_class=structlog.make_filtering_bound_logger(config.log_level)
)
app = FastAPI(title="Idempotent Task Queue", version="1.0.0")
provider = TracerProvider(resource=Resource.create({"service.name": config.otel_service_name}))
FastAPIInstrumentor.instrument_app(app, tracer_provider=provider)
@app.on_event("startup")
async def startup_event():
engine = create_async_engine(config.database_url, pool_size=config.pool_size, max_overflow=config.max_overflow)
asyncio.create_task(process_worker_loop(async_session_factory, worker_id="primary"))
structlog.get_logger().info("Service started", pool_size=config.pool_size)
@app.on_event("shutdown")
async def shutdown_event():
structlog.get_logger().info("Service shutting down")
Quick Start Guide
- Initialize Environment: Create a virtual environment, install
fastapi, uvicorn, sqlalchemy[asyncio], asyncpg, pydantic-settings, alembic, opentelemetry-api, opentelemetry-sdk, and structlog.
- Generate Alembic Config: Run
alembic init migrations, update alembic.ini with your async database URL, and configure env.py to use AsyncEngine for autogeneration.
- Define Models & Migrate: Create your SQLAlchemy declarative models, run
alembic revision --autogenerate -m "initial", review the SQL, then apply with alembic upgrade head.
- Launch Service: Start the application with
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1. Verify health endpoints, submit a test payload with a unique fingerprint header, and monitor structured logs and OpenTelemetry traces.