oint file with atomic writes ensures that partial crashes never corrupt task state.
3. Context Window Manager: LLM providers enforce hard token limits. Feeding an unbounded history triggers truncation errors or degrades model coherence. A sliding window enforces a strict budget, preserves system instructions, and maintains tool-use/tool-result pairing to prevent API validation failures.
Implementation
The following implementation demonstrates a unified memory controller that orchestrates all three primitives. The code uses atomic file operations, explicit token budgeting, and strict separation of concerns.
import json
import os
import time
import fcntl
from pathlib import Path
from cryptography.fernet import Fernet
from typing import Dict, List, Any, Optional
class SessionRecorder:
"""Append-only JSONL log with optional Fernet encryption."""
def __init__(self, log_path: str, encryption_key: Optional[bytes] = None):
self.log_path = Path(log_path)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self.cipher = Fernet(encryption_key) if encryption_key else None
def append(self, record: Dict[str, Any]) -> None:
payload = json.dumps(record)
if self.cipher:
payload = self.cipher.encrypt(payload.encode()).decode()
with open(self.log_path, "a", encoding="utf-8") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(payload + "\n")
f.flush()
fcntl.flock(f, fcntl.LOCK_UN)
def load_recent(self, limit: int = 50) -> List[Dict[str, Any]]:
if not self.log_path.exists():
return []
records = []
with open(self.log_path, "r", encoding="utf-8") as f:
fcntl.flock(f, fcntl.LOCK_SH)
lines = f.readlines()
fcntl.flock(f, fcntl.LOCK_UN)
for line in lines[-limit:]:
raw = line.strip()
if not raw:
continue
if self.cipher:
raw = self.cipher.decrypt(raw.encode()).decode()
records.append(json.loads(raw))
return records
class TaskCheckpoint:
"""Atomic key-value state persistence for workflow progress."""
def __init__(self, state_path: str):
self.state_path = Path(state_path)
self.state_path.parent.mkdir(parents=True, exist_ok=True)
def load(self, default: Dict[str, Any]) -> Dict[str, Any]:
if not self.state_path.exists():
return default.copy()
with open(self.state_path, "r", encoding="utf-8") as f:
return json.load(f)
def save(self, state: Dict[str, Any]) -> None:
tmp_path = self.state_path.with_suffix(".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, self.state_path)
class ContextWindow:
"""Sliding window with system prompt preservation and tool-pair enforcement."""
def __init__(self, max_turns: int, system_prompt: str):
self.max_turns = max_turns
self.system_prompt = system_prompt
self.conversation: List[Dict[str, Any]] = []
self._tool_pairs: Dict[str, Dict[str, Any]] = {}
def add(self, message: Dict[str, Any]) -> None:
self.conversation.append(message)
# Track tool_use/tool_result pairing
if message.get("role") == "assistant":
for block in message.get("content", []):
if isinstance(block, dict) and block.get("type") == "tool_use":
self._tool_pairs[block["id"]] = block
elif message.get("role") == "user":
for block in message.get("content", []):
if isinstance(block, dict) and block.get("type") == "tool_result":
self._tool_pairs.pop(block.get("tool_use_id"), None)
def get_context(self) -> List[Dict[str, Any]]:
# Evict oldest turns while preserving tool pairs
while len(self.conversation) > self.max_turns:
candidate = self.conversation[0]
if candidate.get("role") == "assistant":
for block in candidate.get("content", []):
if isinstance(block, dict) and block.get("type") == "tool_use":
if block["id"] in self._tool_pairs:
return [
{"role": "system", "content": self.system_prompt}
] + self.conversation[-self.max_turns:]
self.conversation.pop(0)
return [
{"role": "system", "content": self.system_prompt}
] + self.conversation[-self.max_turns:]
class AgentMemoryController:
"""Orchestrates log, state, and context window."""
def __init__(self, config: Dict[str, Any]):
self.recorder = SessionRecorder(
log_path=config["log_path"],
encryption_key=config.get("encryption_key")
)
self.checkpoint = TaskCheckpoint(state_path=config["state_path"])
self.window = ContextWindow(
max_turns=config["max_context_turns"],
system_prompt=config["system_prompt"]
)
# Initialize from disk
self.state = self.checkpoint.load(default=config.get("default_state", {}))
recent_history = self.recorder.load_recent(limit=config["max_context_turns"])
for msg in recent_history:
self.window.add(msg)
def push_turn(self, user_msg: str, assistant_msg: str) -> None:
user_record = {"role": "user", "content": user_msg, "ts": time.time()}
assistant_record = {"role": "assistant", "content": assistant_msg, "ts": time.time()}
self.recorder.append(user_record)
self.recorder.append(assistant_record)
self.window.add(user_record)
self.window.add(assistant_record)
self.checkpoint.save(self.state)
def get_context_payload(self) -> List[Dict[str, Any]]:
return self.window.get_context()
Architecture Decisions Explained
- Atomic Writes via
os.replace: The TaskCheckpoint.save() method writes to a temporary file first, then atomically replaces the target. This guarantees that a crash during serialization never leaves a corrupted JSON file. The agent always loads either the previous valid state or the new one.
- File Locking (
fcntl): Concurrent reads/writes to the JSONL log are serialized using advisory locks. This prevents interleaved lines when multiple threads or processes interact with the same session file.
- Tool-Pair Preservation: Anthropic and OpenAI APIs reject malformed message arrays where a
tool_use block lacks its corresponding tool_result. The ContextWindow tracks these IDs and refuses to evict a turn if it would break the pairing contract.
- Separation of State and Conversation: Task metadata (processed IDs, counters, preferences) lives in a separate JSON file. This allows the agent to query progress without parsing conversation logs, and enables independent backup/rotation policies for each data type.
Pitfall Guide
1. Merging Conversation Logs with Task State
Explanation: Storing workflow counters or processed IDs inside the same JSONL file as messages creates structural ambiguity. Querying for "last processed item" requires parsing every line, filtering by role, and extracting nested fields.
Fix: Maintain strict separation. Use JSONL for append-only message history and a dedicated JSON checkpoint for mutable task state.
Explanation: Evicting an assistant turn containing a tool_use block without its corresponding tool_result violates the provider's message schema. The API returns a validation error, halting the agent.
Fix: Track tool call IDs in memory. Before evicting a turn, verify that all associated results are either already evicted or will remain in the window. Implement explicit pairing guards as shown in the ContextWindow class.
3. Unbounded Context Growth
Explanation: Developers often load the entire conversation history into the API payload. As sessions extend beyond 50-100 turns, token counts exceed model limits, triggering truncation or 400 errors.
Fix: Enforce a hard turn limit at the application layer. Use a sliding window that drops the oldest conversational turns while preserving the system prompt. Monitor token usage before each API call.
4. Concurrent File Writes Without Locking
Explanation: Multiple threads or background workers appending to the same JSONL file can interleave lines, corrupting JSON parsing on load.
Fix: Apply advisory file locks (fcntl.LOCK_EX for writes, fcntl.LOCK_SH for reads). For high-concurrency multi-user systems, migrate to SQLite or a lightweight document store with transaction support.
5. Storing Encryption Keys Alongside Encrypted Data
Explanation: Generating a Fernet key and saving it in the same directory as the JSONL file defeats the purpose of encryption. A breach exposes both the ciphertext and the decryption key.
Fix: Store encryption keys in environment variables, secret managers (AWS Secrets Manager, HashiCorp Vault), or hardware security modules. The application reads the key at runtime and never persists it to disk.
6. Assuming Sliding Windows Preserve Long-Term Facts
Explanation: A sliding window optimizes for recency, not retention. Critical user preferences or workflow decisions made 40 turns ago will be evicted and lost to the model.
Fix: Extract high-value facts into the key-value state checkpoint before they slide out. Promote important context to the state store, then reference it in the system prompt or inject it as structured context.
7. Skipping Token Budget Verification
Explanation: Sending context payloads to the LLM without estimating token count leads to unpredictable API behavior. Different models tokenize text differently, and tool schemas consume significant tokens.
Fix: Integrate a lightweight token counter before each API call. Truncate or summarize oversized tool outputs. Fail fast with a clear error if the payload exceeds the model's context window.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Single-user assistant, <150 turns/session | File-first (JSONL + KV + Sliding Window) | Zero infrastructure, instant debugging, deterministic recovery | Near-zero (only LLM API costs) |
| Multi-user platform, concurrent writers | SQLite or lightweight document store | ACID transactions, row-level locking, concurrent safety | Moderate (DB hosting + maintenance) |
| Cross-session semantic search required | Vector DB + offline JSONL indexing | Embeddings enable similarity retrieval over historical data | High (embedding API + vector index hosting) |
| Highly sensitive data (PII, financial) | JSONL + Fernet encryption + strict access controls | Encryption at rest, audit trail, no third-party data exposure | Low-Moderate (key management overhead) |
| Long-running data pipelines with checkpoints | Atomic KV state + JSONL audit log | Crash recovery, idempotent processing, progress tracking | Low (disk I/O only) |
Configuration Template
# config/agent_memory.py
import os
from cryptography.fernet import Fernet
AGENT_MEMORY_CONFIG = {
"log_path": os.path.expanduser("~/.agent/sessions/current.jsonl"),
"state_path": os.path.expanduser("~/.agent/state/current.json"),
"max_context_turns": 24,
"system_prompt": "You are a precise assistant. Maintain workflow continuity.",
"default_state": {
"workflow_step": 0,
"processed_ids": [],
"user_preferences": {},
"last_checkpoint_ts": 0
},
"encryption_key": os.environ.get("AGENT_FERNET_KEY")
}
# Generate key once and store securely:
# from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())
Quick Start Guide
- Initialize the memory controller: Instantiate
AgentMemoryController with the configuration template. The controller automatically creates directories, loads existing state, and seeds the context window from recent log entries.
- Wire the agent loop: Before each LLM call, invoke
get_context_payload() to retrieve the sliding window. Send the payload to your provider (Anthropic, OpenAI, etc.).
- Persist turns: After receiving the assistant response, call
push_turn(user_input, assistant_response). This appends to the JSONL log, updates the sliding window, and atomically saves task state.
- Handle restarts: On application restart, the controller loads the last valid checkpoint and replays recent history. The agent resumes exactly where it left off without manual intervention.
- Monitor token usage: Integrate a lightweight token counter before API calls. If the payload approaches the model's limit, trigger context summarization or state promotion before proceeding.