ansformations and log summarization. Execution happens inside isolated containers.
4. Memory Layer: Persists project rules, coding conventions, and failure logs. This prevents the agent from repeating mistakes across sessions.
5. Safety Gate: Validates every action before execution. Enforces test requirements, blocks destructive commands, and halts on repeated failures.
Implementation Strategy
The following implementation uses Python's asyncio for non-blocking task processing, pydantic for strict schema validation, and a modular tool layer. The design prioritizes idempotency, explicit state tracking, and deterministic rollback.
import asyncio
import sqlite3
import subprocess
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional
import pydantic
# Strict task definition prevents scope creep
class TaskRequest(pydantic.BaseModel):
task_id: str
repository_url: str
objective: str
constraints: List[str] = field(default_factory=list)
test_command: str
max_iterations: int = 3
class AgentOrchestrator:
def __init__(self, db_path: str = "tasks.db", workspace_root: str = "./workspaces"):
self.db_path = db_path
self.workspace_root = Path(workspace_root)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS task_queue (
id TEXT PRIMARY KEY,
repo_url TEXT,
objective TEXT,
constraints TEXT,
test_cmd TEXT,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
async def run_loop(self):
"""Main execution cycle with backoff and state management"""
while True:
task = self._fetch_next_task()
if not task:
await asyncio.sleep(30)
continue
try:
await self._process_task(task)
except Exception as e:
self._log_failure(task.task_id, str(e))
finally:
self._cleanup_workspace(task.task_id)
def _fetch_next_task(self) -> Optional[TaskRequest]:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT * FROM task_queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1"
)
row = cursor.fetchone()
if not row:
return None
conn.execute("UPDATE task_queue SET status = 'processing' WHERE id = ?", (row[0],))
return TaskRequest(
task_id=row[0],
repository_url=row[1],
objective=row[2],
constraints=row[3].split("|") if row[3] else [],
test_command=row[4]
)
async def _process_task(self, task: TaskRequest):
workspace = self.workspace_root / task.task_id
workspace.mkdir(parents=True, exist_ok=True)
# 1. Isolated checkout
self._clone_repo(task.repository_url, workspace)
# 2. Layered context assembly
context = ContextRetriever(workspace).assemble(task)
# 3. Planning phase
plan = await LLMClient.plan(task.objective, context, task.constraints)
# 4. Execution with safety gates
executor = ToolExecutor(workspace, task.test_command)
for step in plan.steps:
if not SafetyGate.validate(step):
raise RuntimeError(f"Blocked unsafe operation: {step.action}")
result = await executor.run(step)
if not result.success:
if plan.iterations >= task.max_iterations:
raise RuntimeError("Max retry limit reached")
plan = await LLMClient.debug(step, result.logs, context)
# 5. Verification gate
test_result = await executor.run_tests()
if not test_result.passed:
raise RuntimeError("Tests failed after execution")
# 6. Summary & PR creation
summary = await LLMClient.summarize(task, executor.changelog)
self._open_pull_request(task, summary)
Architecture Decisions & Rationale
- Async Queue Processing: Synchronous loops block on I/O and model calls.
asyncio allows the agent to handle multiple tasks concurrently while respecting rate limits.
- Strict Pydantic Schemas: Prevents malformed tasks from entering the pipeline. Explicit constraints force the planner to stay within scope.
- Isolated Workspaces: Each task runs in a dedicated directory. This guarantees idempotency and prevents cross-task state contamination.
- Layered Context Retrieval: The
ContextRetriever never loads the full repo. It queries only what the task requires, drastically reducing token consumption.
- Explicit Safety Gate: Separates validation from execution. This makes it trivial to audit blocked operations and adjust policies without touching core logic.
Pitfall Guide
1. Context Flooding
Explanation: Feeding the entire repository or large log files into the prompt dilutes the model's attention and inflates costs. The agent spends tokens parsing irrelevant code.
Fix: Implement AST-aware or ripgrep-based retrieval. Only inject files that match the task scope, plus project conventions and failing test output.
2. Unbounded Execution Loops
Explanation: Without iteration caps, the agent can enter infinite retry cycles when tests fail, burning tokens and time.
Fix: Enforce a strict max_iterations limit per task. After the threshold, halt execution and escalate to human review.
3. Ignoring State Persistence
Explanation: Treating each task as stateless forces the agent to relearn project conventions repeatedly. This increases latency and causes inconsistent formatting.
Fix: Maintain a structured memory layer. Store project_rules.md, coding standards, and a failure_log.md that the retriever injects into every context window.
4. Bypassing Test Gates
Explanation: Allowing PR creation without verified test execution leads to broken builds and erodes trust in the agent.
Fix: Hardcode test execution as a mandatory step before any merge request. If tests fail, the pipeline must stop and log the exact failure trace.
5. Over-Engineering the Planner
Explanation: Using a single expensive model for planning, execution, and summarization wastes budget on low-complexity operations.
Fix: Route planning to a strong model (e.g., Claude Opus or GPT-4 class). Route log summarization, formatting, and repetitive transformations to a cheaper, faster model.
6. Credential Leakage in Workspaces
Explanation: Running agents in environments with access to production secrets or environment variables creates severe security risks.
Fix: Execute all tasks inside ephemeral Docker containers with zero network access to internal services. Never mount .env files or credential stores into the workspace.
7. Assuming Architectural Judgment
Explanation: Agents excel at mechanical tasks but fail at trade-off decisions, security approvals, and product direction.
Fix: Define explicit scope boundaries in the system prompt. If a task requires architectural changes, dependency additions, or security modifications, the agent must halt and request human approval.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Small team, single repo | Local SQLite queue + Docker isolation | Minimal infrastructure overhead, easy to audit | Low (~$50/mo VPS) |
| Multi-repo, high volume | GitHub Issues queue + CI runner | Leverages existing auth, scales with GitHub Actions | Medium ($100-$200/mo) |
| Strict compliance/security | Native sandbox + read-only git checkout | Prevents credential leakage, ensures deterministic state | Low (infrastructure only) |
| Budget-constrained | Layered retrieval + cheap summarization model | Cuts token usage by 80-90%, maintains quality | Very Low (~$15-$25/mo) |
| High-frequency deploys | Async queue + parallel workspaces | Prevents bottlenecks, enables continuous background execution | Medium (higher CPU/RAM) |
Configuration Template
# agent_config.yaml
orchestrator:
db_path: "./data/tasks.db"
workspace_root: "./workspaces"
poll_interval_seconds: 30
max_concurrent_tasks: 4
context_retrieval:
strategy: "layered"
max_files_per_task: 15
include_conventions: true
include_failure_log: true
search_tool: "ripgrep"
safety_gate:
allow_destructive_commands: false
require_test_pass: true
max_retry_iterations: 3
block_prod_credentials: true
require_human_approval_for:
- "dependency_addition"
- "architecture_change"
- "security_modification"
model_routing:
planner:
provider: "openai"
model: "gpt-4o"
max_tokens: 4096
executor:
provider: "anthropic"
model: "claude-3-haiku"
max_tokens: 2048
summarizer:
provider: "openai"
model: "gpt-4o-mini"
max_tokens: 1024
cost_controls:
token_budget_per_task: 8000
log_truncation_limit: 500
cache_project_rules: true
Quick Start Guide
- Initialize the workspace: Create a fresh directory, install dependencies (
pip install asyncio pydantic sqlite3), and place the agent_config.yaml template in the root.
- Seed the task queue: Insert a mechanical task into SQLite using the schema defined in the orchestrator. Example:
INSERT INTO task_queue (id, repo_url, objective, constraints, test_cmd) VALUES ('task-001', 'https://github.com/your/repo', 'Add unit tests for auth module', 'Use pytest|Follow existing patterns', 'pytest tests/auth');
- Launch the orchestrator: Run
python main.py (or your entry script). The agent will poll the queue, clone the repo into an isolated workspace, assemble layered context, execute the plan, run tests, and generate a summary.
- Monitor and iterate: Check the
failure_log.md and safety gate reports. Adjust context retrieval limits and iteration caps based on your first 10-20 executions. Once stable, schedule the process via systemd or cron for continuous background operation.