How I built an idempotent async task queue with Celery, Redis, and FastAPI
Eliminating Duplicate Execution in Distributed Celery Workloads: A Deterministic Locking Strategy
Current Situation Analysis
Distributed task queues like Celery operate on "at-least-once" delivery semantics. This is a fundamental architectural constraint, not a bug. When a worker acknowledges a task, the broker removes it. If the worker crashes after acknowledgment but before completion, the task is lost. If the worker crashes before acknowledgment, the broker re-queues the task. In production environments, network partitions, OOM kills, and broker timeouts cause tasks to be re-queued frequently.
Most development teams assume that a task executes exactly once. This assumption leads to catastrophic side effects in critical workflows. A payment processing task running twice results in double charges. A notification task running twice causes user spam. A data aggregation task running twice corrupts metrics. The industry pain point is the gap between the broker's delivery guarantee and the application's requirement for idempotency.
This problem is often misunderstood because local development environments rarely simulate worker crashes or network latency. Developers build tasks that work perfectly under ideal conditions but fail silently or destructively under failure modes. Data from production incident reports indicates that duplicate execution is a leading cause of data integrity violations in async systems, yet fewer than 30% of task implementations include robust idempotency controls.
WOW Moment: Key Findings
The following comparison demonstrates why naive approaches fail and why a deterministic locking strategy is the only viable path for critical workloads.
| Approach | Duplicate Risk | Retry Safety | Implementation Complexity | Production Viability |
|---|---|---|---|---|
| Naive Check | High | Unsafe | Low | Fails under concurrent load due to race conditions. |
| Task ID Locking | Medium | Unsafe | Low | Breaks on retry; Celery generates new IDs per attempt. |
| Payload Hash + Atomic Lock | Negligible | Safe | Medium | Robust; handles retries, crashes, and concurrent workers. |
Why this matters: The Payload Hash + Atomic Lock strategy decouples idempotency from Celery's internal mechanics. By deriving the lock key from the business payload rather than the task metadata, you ensure that retries of the same logical operation are recognized as duplicates. The atomic lock prevents race conditions where two workers simultaneously decide to execute the same task. This enables exactly-once semantics at the application layer, even when the broker guarantees at-least-once delivery.
Core Solution
The solution requires two components: deterministic fingerprinting and distributed mutex acquisition.
1. Deterministic Fingerprinting
Every task must generate a unique key based on its input data. This key must be stable across retries. If the same payload is submitted multiple times, the fingerprint must be identical.
Implementation Details:
- Use a cryptographic hash (SHA-256) of the payload.
- Serialize the payload to JSON with sorted keys to ensure deterministic ordering.
- Include the task name in the key to prevent collisions between different task types with similar payloads.
- Avoid including metadata like timestamps or request IDs in the hash, as these change per invocation.
import hashlib
import json
from typing import Any, Dict
def compute_execution_fingerprint(task_name: str, payload: Dict[str, Any]) -> str:
"""
Generates a stable fingerprint from the task name and payload.
The payload is canonicalized to ensure deterministic hashing.
"""
canonical_payload = json.dumps(payload, sort_keys=True, separators=(',', ':'))
payload_digest = hashlib.sha256(canonical_payload.encode('utf-8')).hexdigest()
return f"celery:exec:{task_name}:{payload_digest}"
2. Atomic Locking with Redis
Redis provides atomic operations that are essential for distributed locking. The SET command with NX (set if not exists) and EX (expiry) flags allows a worker to acquire a lock atomically. Only one worker can successfully acquire the lock; others must detect the existing lock and exit.
Architecture Decisions:
- Lock Expiry: The lock TTL must exceed the p99 task duration. If the lock expires before the task finishes, a second worker may acquire the lock and execute the task concurrently. Set TTL to at least 2Γ the p99 duration.
- Completion State: Upon successful execution, the lock key should be updated to a "completed" state with a longer TTL. This prevents re-execution if the same payload arrives later from a different source.
- Failure Handling: If the task fails, the lock must be released immediately to allow retries. Deleting the key achieves this.
import redis
from celery import Task
from celery.exceptions import Retry
class IdempotentTask(Task):
"""
Base task class that enforces idempotent execution using
payload-based fingerprinting and Redis atomic locks.
"""
abstract = True
# Configuration overrides
acks_late = True
reject_on_worker_lost = True
# Lock parameters
LOCK_TTL_SECONDS = 600 # Must be > p99 task duration
COMPLETION_TTL_SECONDS = 86400 # 24 hours retention
redis_client: redis.Redis = None
def __init__(self):
super().__init__()
if self.redis_client is None:
self.redis_client = redis.Redis(
host='redis-cluster',
port=6379,
decode_responses=True
)
def _acquire_lock(self, fingerprint: str) -> bool:
"""
Atomically acquires the execution lock.
Returns True if lock acquired, False if already held or completed.
"""
lock_key = f"lock:{fingerprint}"
# NX ensures atomicity; EX prevents deadlocks
acquired = self.redis_client.set(
lock_key,
"processing",
nx=True,
ex=self.LOCK_TTL_SECONDS
)
return bool(acquired)
def _check_completion(self, fingerprint: str) -> bool:
"""
Checks if the task has already completed successfully.
"""
lock_key = f"lock:{fingerprint}"
status = self.redis_client.get(lock_key)
return status == "completed"
def _finalize_success(self, fingerprint: str):
"""
Marks the task as completed with extended retention.
"""
lock_key = f"lock:{fingerprint}"
self.redis_client.set(
lock_key,
"completed",
ex=self.COMPLETION_TTL_SECONDS
)
def _release_lock(self, fingerprint: str):
"""
Releases the lock on failure to permit retry.
"""
lock_key = f"lock:{fingerprint}"
self.redis_client.delete(lock_key)
def __call__(self, *args, **kwargs):
# Generate fingerprint from kwargs (payload)
fingerprint = compute_execution_fingerprint(self.name, kwargs)
# Check for prior completion to skip redundant work
if self._check_completion(fingerprint):
return {"status": "skipped", "reason": "already_completed"}
# Attempt atomic lock acquisition
if not self._acquire_lock(fingerprint):
# Lock exists; another worker is processing or completed
return {"status": "skipped", "reason": "concurrent_execution"}
try:
# Execute the actual task logic
result = self.run(*args, **kwargs)
# Mark as completed
self._finalize_success(fingerprint)
return result
except Exception as exc:
# Release lock on failure to allow retry
self._release_lock(fingerprint)
raise exc
3. Task Configuration and Retry Logic
The task class must be configured to support late acknowledgments and worker loss handling. Retry logic should distinguish between transient errors (network timeouts) and permanent errors (invalid data).
from celery import Celery
app = Celery('worker')
# Global Celery configuration
app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
task_serializer='json',
result_serializer='json',
accept_content=['json'],
)
@app.task(
bind=True,
base=IdempotentTask,
max_retries=3,
default_retry_delay=60,
queue='critical'
)
def process_transaction(self, transaction_id: str, amount: float, currency: str):
"""
Example task processing a financial transaction.
Idempotency is guaranteed by the payload fingerprint.
"""
payload = {
"transaction_id": transaction_id,
"amount": amount,
"currency": currency
}
try:
# Business logic here
gateway_response = payment_gateway.charge(payload)
return {"status": "success", "ref": gateway_response.ref}
except TransientGatewayError as e:
# Retry with exponential backoff
raise self.retry(
exc=e,
countdown=2 ** self.request.retries
)
except PermanentValidationError:
# Do not retry; route to DLQ
dead_letter_queue.push(payload, error="invalid_transaction")
raise
Pitfall Guide
Keying on Task ID
- Explanation: Celery generates a unique task ID for every attempt. If you use the task ID as the lock key, retries will create new keys, allowing duplicate execution.
- Fix: Always derive the lock key from the business payload, not Celery metadata.
Lock TTL Shorter Than Task Duration
- Explanation: If the lock expires before the task finishes, a second worker may acquire the lock and execute the task concurrently, causing duplicates.
- Fix: Set
LOCK_TTL_SECONDSto at least 2Γ the p99 task duration. Monitor task latencies and adjust TTL dynamically if needed.
Non-Deterministic Payload Serialization
- Explanation: JSON serialization order can vary between Python versions or libraries. Floating-point representation may differ. This causes the same logical payload to produce different fingerprints.
- Fix: Use
sort_keys=Trueand consistent separators. Normalize floating-point numbers to fixed precision before hashing.
Ignoring
acks_late- Explanation: Default Celery behavior acknowledges tasks upon receipt. If a worker crashes after receipt but before completion, the task is lost and never retried.
- Fix: Set
task_acks_late=Trueglobally or per-task to ensure acknowledgment only occurs after successful execution.
Deleting Lock on Success
- Explanation: If you delete the lock key after success, a duplicate payload arriving later will acquire the lock and re-execute the task.
- Fix: Update the lock key to a "completed" state with a long TTL instead of deleting it.
Race Condition on Completion Check
- Explanation: Checking for completion without a lock can lead to race conditions where two workers both see "not completed" and proceed.
- Fix: The atomic
SET NXoperation is the gate. If the lock acquisition fails, check the status. If status is "completed", skip. If status is "processing", skip. This is safe because the lock acquisition is atomic.
Payload Size and Hashing Performance
- Explanation: Hashing large payloads on every invocation adds latency and CPU overhead.
- Fix: For large payloads, hash only the relevant fields that determine idempotency. Alternatively, use a pre-computed checksum provided by the client.
Production Bundle
Action Checklist
- Enable
task_acks_late=Truein Celery configuration. - Enable
task_reject_on_worker_lost=Trueto handle worker crashes. - Implement
compute_execution_fingerprintwith canonical JSON serialization. - Create
IdempotentTaskbase class with Redis atomic locking. - Set
LOCK_TTL_SECONDSbased on p99 task latency metrics. - Distinguish between transient and permanent errors in retry logic.
- Implement DLQ routing for permanent failures.
- Test idempotency by simulating worker kills during task execution.
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|---|---|---|
| Financial Transactions | Payload Hash + Atomic Lock | Zero tolerance for duplicates; requires strong consistency. | Medium (Redis ops per task). |
| High-Volume Notifications | Task ID Locking | Duplicates are acceptable; performance is critical. | Low. |
| Data Aggregation | Idempotency Key from Source | Source provides unique key; avoids payload hashing. | Low. |
| External API Calls | Payload Hash + Lock | Prevents duplicate charges or state changes on external systems. | Medium. |
Configuration Template
# celery_config.py
from celery import Celery
app = Celery('production_worker')
app.conf.update(
broker_url='redis://redis-cluster:6379/0',
result_backend='redis://redis-cluster:6379/1',
# Critical for idempotency
task_acks_late=True,
task_reject_on_worker_lost=True,
# Serialization
task_serializer='json',
result_serializer='json',
accept_content=['json'],
# Retry defaults
task_default_retry_delay=60,
task_max_retries=3,
# Timezone
timezone='UTC',
enable_utc=True,
)
Quick Start Guide
Install Dependencies:
pip install celery redisDefine Base Task: Copy the
IdempotentTaskclass andcompute_execution_fingerprintfunction into your codebase. Ensure Redis connection parameters match your environment.Configure Celery: Apply the configuration template. Set
task_acks_late=Trueandtask_reject_on_worker_lost=True.Implement Tasks: Inherit from
IdempotentTaskfor any task requiring idempotency. Pass payload data viakwargsto ensure fingerprinting works correctly.Run Worker:
celery -A worker worker --loglevel=info --concurrency=4Verify idempotency by sending the same payload twice; the second execution should return
skipped.
Mid-Year Sale β Unlock Full Article
Base plan from just $4.99/mo or $49/yr
Sign in to read the full article and unlock all tutorials.
Sign In / Register β Start Free Trial7-day free trial Β· Cancel anytime Β· 30-day money-back
