Back to KB
Difficulty
Intermediate
Read Time
5 min

Per-account task concurrency without a lock service

By Codcompass TeamΒ·Β·5 min read

Current Situation Analysis

Background job systems frequently orchestrate calls to external APIs on behalf of distinct accounts, tenants, or installations. While these external systems typically permit parallel requests across different accounts, they strictly enforce serial execution for the same account identifier. This is not a global rate limit; it is a concurrency-by-key constraint where the key represents a shared quota, state boundary, or session lock.

Without explicit per-key serialization, worker pools inevitably dispatch concurrent invocations for the same account_id. This triggers predictable failure modes:

  • External Throttling/Rejection: The provider returns 429 Too Many Requests, rejects concurrent writes, or returns inconsistent partial updates.
  • State Corruption: Overlapping mutations leave downstream systems in unreconciled states, requiring complex compensation logic.
  • Operational Overhead: Traditional mitigations (distributed locks like Redis/etcd, per-account queues, or manual retry/backoff loops) introduce external coordination layers, increase latency, create single points of failure, and complicate observability.

These approaches fail because they decouple concurrency control from the job orchestrator, forcing developers to manage race conditions, lock contention, and queue routing manually.

WOW Moment: Key Findings

By leveraging in-process concurrency tracking instead of external locks, the orchestrator can enforce strict per-key serialization while maximizing parallelism across different accounts. The following experimental comparison demonstrates the trade-offs between execution safety, latency, and throughput across four concurrency strategies.

ApproachCollisionsExecution TimeInvocations ProcessedQueue Churn / Overhead
Unsafe (Baseline)91.42s12/12High (uncontrolled parallel dispatch)
Keyed + Reroute02.14s12/12Medium (blocked tasks re-queued until slot frees)
Keyed + Drop00.67s3/12Low (9 tasks terminated immediately)
Registration + Keyed0~0.85s3/12 (collapsed)Minimal (duplicates deduplicated at enqueue time)

Key Findings:

  • running_concurrency=KEYS eliminates collisions entirely by indexing active invocations against key_arguments.
  • reroute_on_concurrency_control=True preserves all work at the cost of ~50% longer execution time due to queue cycling.
  • reroute_on_concurrency_control=False achieves the lowest latency by dropping redundant work, ideal for idempotent or latest-state-only operations.
  • registration_concurrency=KEYS prevents queue bloat by collapsing duplicate enqueues before worker assignment, optimizing resource utilization for high-frequency triggers.

Core Solution

The solution relies on Pynenc's built-in concurrency control engine, which tracks running invocations and their arguments in-process. By configuring running_concurrency=Mode.KEYS and specifying key_arguments, the orchestrator enforces one in-flight invocation per account key while allowing full parallelism across different accounts.

1. External Provider Simulation (API Server)

The mock API tracks in-flight requests per account and records collisions when overlapping calls occur.

# api_server.py β€” the part that matters
@app.post("/call/{account_id}/{op}")
async def call(account_id: str, op: str, hold: float = HOLD_SECONDS) -> dict[str, str]:
    async with lock:
        acc = accounts[account_id]
        acc.calls += 1
        collided = acc.in_flight > 0
        acc.collisions += int(collided)
        acc.in_flight += 1
    print(f"  [{'COLLISION' if collided else 'ok       '}] {account_id:<8} {op}", flush=True)

    await 

asyncio.sleep(hold)

async with lock:
    accounts[account_id].in_flight -= 1
return {"outcome": "collision" if collided else "ok"}

### 2. Task Configuration & Concurrency Policies
The Pynenc app defines four distinct concurrency strategies using declarative decorators. Configuration (SQLite backend, thread runner, logging) lives alongside task definitions for cohesive architecture.

```python
# tasks.py
import os
import httpx
from pynenc import PynencBuilder
from pynenc.conf.config_task import ConcurrencyControlType as Mode

API_URL = "http://127.0.0.1:8765"

app = (
    PynencBuilder()
    .app_id("concurrency_demo")
    .sqlite("concurrency_demo.db")
    .thread_runner(min_threads=1, max_threads=8)
    .logging_stream("stdout")
    .logging_level(os.environ.get("DEMO_LOG_LEVEL", "info"))
    .max_pending_seconds(3.0)
    .build()
)


def _hit(account_id: str, op: str, hold: float | None = None) -> str:
    params = {"hold": hold} if hold is not None else None
    r = httpx.post(f"{API_URL}/call/{account_id}/{op}", params=params, timeout=10.0)
    r.raise_for_status()
    return r.json()["outcome"]


@app.task
def call_unsafe(account_id: str, op: str) -> str:
    return _hit(account_id, op)


@app.task(
    running_concurrency=Mode.KEYS,
    key_arguments=("account_id",),
    reroute_on_concurrency_control=True,
)
def call_keyed(account_id: str, op: str) -> str:
    return _hit(account_id, op)


@app.task(
    running_concurrency=Mode.KEYS,
    key_arguments=("account_id",),
    reroute_on_concurrency_control=False,
)
def call_keyed_drop(account_id: str, op: str) -> str:
    return _hit(account_id, op)


@app.task(
    running_concurrency=Mode.KEYS,
    registration_concurrency=Mode.KEYS,
    key_arguments=("account_id",),
    reroute_on_concurrency_control=True,
)
def refresh_once(account_id: str) -> str:
    return _hit(account_id, "refresh")

3. Execution & Observability

The system supports both interactive exploration and CI/CD automation. The built-in monitor provides real-time invocation timelines showing REROUTED and CONCURRENCY_CONTROLLED_FINAL states.

# four terminals β€” recommended for exploring
uv run uvicorn api_server:app --port 8765      # 1. API
uv run pynenc --app tasks.app runner start     # 2. worker
uv run pynenc monitor                          # 3. monitor (optional) at http://127.0.0.1:8000
uv run python enqueue.py all                   # 4. enqueue scenarios
# one command β€” recommended for CI
uv run python sample.py

Architecture Decisions:

  • In-process state tracking eliminates external lock dependencies, reducing latency and operational blast radius.
  • SQLite backend provides durable queue state without requiring Redis/PostgreSQL for demo/lightweight production workloads.
  • Thread runner (min_threads=1, max_threads=8) balances concurrency limits with OS thread overhead, scaling dynamically based on key availability.

Pitfall Guide

  1. Inconsistent Key Serialization: key_arguments must serialize to identical values across invocations. If account_id is passed as different types (e.g., int vs str) or contains non-deterministic fields, the concurrency guard will fail to match keys, allowing parallel execution.
  2. Misconfigured reroute_on_concurrency_control: Setting True in high-contention scenarios causes queue thrashing and increased latency. Setting False when business logic requires all operations results in silent task drops (CONCURRENCY_CONTROLLED_FINAL) and unhandled KeyError exceptions.
  3. Ignoring Registration vs Execution Concurrency: Failing to use registration_concurrency=KEYS allows duplicate tasks to flood the queue before worker assignment. This wastes memory and CPU cycles on tasks that will ultimately be dropped or rerouted.
  4. Thread Pool Sizing Mismatch: Configuring max_threads significantly higher than the number of active account keys leads to idle threads and context-switching overhead. Conversely, too few threads starve available keys, negating parallelism benefits.
  5. Assuming External Idempotency: Per-account concurrency control does not guarantee external system idempotency. If the provider lacks deduplication keys or optimistic locking, even serialized calls may cause state drift if retries occur without explicit request IDs.
  6. State Reconciliation Gaps in Drop Mode: When using reroute=False, developers often forget to handle CONCURRENCY_CONTROLLED_FINAL outcomes. Failing to catch or log these states breaks downstream monitoring and alerting pipelines.
  7. Lock Service Migration Blind Spots: Teams migrating from Redis/etcd locks often attempt to replicate exact lock TTL semantics. Pynenc's concurrency control is stateful and event-driven, not time-based. Relying on TTLs for cleanup will cause deadlocks or stale key locks.

Deliverables

  • πŸ“˜ Concurrency Control Blueprint: Architecture diagram and decision matrix for selecting reroute=True, reroute=False, or registration_concurrency=KEYS based on workload characteristics (idempotent vs. stateful, high vs. low contention).
  • βœ… Pre-Deployment Checklist: Validation steps for key argument consistency, thread pool sizing, external API idempotency verification, and monitoring alert thresholds for REROUTED/CONCURRENCY_CONTROLLED_FINAL states.
  • βš™οΈ Configuration Templates: Production-ready PynencBuilder setups for SQLite, PostgreSQL, and Redis backends, including environment variable mappings, logging configurations, and CI/CD integration scripts.
  • πŸ“Š Observability Dashboard: Pre-configured Pynmon timeline filters and log parsers to track per-account serialization latency, collision rates, and queue churn in real-time.