high-volume agent runs.
5. Monotonic Timing: Duration calculations use monotonic clocks to avoid skew from system time adjustments.
Implementation
The following TypeScript-style Python implementation demonstrates a production-ready telemetry class. Note the use of distinct naming conventions and structural choices compared to reference libraries.
import json
import time
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
class AgentTelemetry:
"""
Manages per-step JSONL logging for LLM agent runs.
Ensures thread-safe writes and in-memory summary aggregation.
"""
def __init__(self, output_dir: Path, run_identifier: str):
self._file_path = output_dir / f"{run_identifier}.jsonl"
self._run_id = run_identifier
self._buffer: List[Dict[str, Any]] = []
self._sequence = 0
self._start_time = time.monotonic()
self._lock = threading.Lock()
# Ensure output directory exists
output_dir.mkdir(parents=True, exist_ok=True)
def _append_event(self, payload: Dict[str, Any]) -> None:
"""
Atomically writes an event to the JSONL file and updates the buffer.
"""
with self._lock:
self._sequence += 1
event = {
"run_id": self._run_id,
"seq": self._sequence,
"ts_epoch": time.time(),
**payload
}
self._buffer.append(event)
# Compact JSON serialization for minimal storage footprint
json_line = json.dumps(event, separators=(',', ':'))
with open(self._file_path, "a") as fh:
fh.write(json_line + "\n")
def track_inference(
self,
model_name: str,
input_tokens: int,
output_tokens: int,
stop_reason: str,
latency_ms: float = 0.0
) -> None:
"""
Records an LLM inference step.
"""
self._append_event({
"event_type": "inference",
"model": model_name,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"stop_reason": stop_reason,
"latency_ms": latency_ms
})
def track_tool_use(
self,
tool_name: str,
arguments: Dict[str, Any],
result: Any = None,
error_message: Optional[str] = None,
latency_ms: float = 0.0
) -> None:
"""
Records a tool execution step.
"""
self._append_event({
"event_type": "tool_use",
"tool": tool_name,
"args": arguments,
"result": result,
"error": error_message,
"latency_ms": latency_ms
})
def generate_report(self) -> Dict[str, Any]:
"""
Computes summary statistics from the in-memory buffer.
"""
with self._lock:
inference_steps = [e for e in self._buffer if e["event_type"] == "inference"]
tool_steps = [e for e in self._buffer if e["event_type"] == "tool_use"]
total_tokens = sum(
step.get("input_tokens", 0) + step.get("output_tokens", 0)
for step in inference_steps
)
error_count = sum(1 for step in tool_steps if step.get("error"))
duration_ms = (time.monotonic() - self._start_time) * 1000
return {
"run_id": self._run_id,
"total_steps": len(self._buffer),
"inference_count": len(inference_steps),
"tool_count": len(tool_steps),
"total_tokens": total_tokens,
"error_count": error_count,
"duration_ms": duration_ms
}
Usage Example
Integrating telemetry into an agent loop requires wrapping inference and tool calls with tracking methods.
from agent_telemetry import AgentTelemetry
from pathlib import Path
def execute_agent_workflow(task: str):
telemetry = AgentTelemetry(output_dir=Path("./logs"), run_identifier="run-xyz-789")
messages = [{"role": "user", "content": task}]
while True:
start_inference = time.monotonic()
response = call_llm_api(messages)
inference_latency = (time.monotonic() - start_inference) * 1000
telemetry.track_inference(
model_name=response.model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
stop_reason=response.stop_reason,
latency_ms=inference_latency
)
if response.stop_reason == "end_turn":
break
for tool_call in response.tool_calls:
start_tool = time.monotonic()
try:
tool_result = execute_tool(tool_call.name, tool_call.arguments)
tool_latency = (time.monotonic() - start_tool) * 1000
telemetry.track_tool_use(
tool_name=tool_call.name,
arguments=tool_call.arguments,
result=tool_result,
latency_ms=tool_latency
)
messages.append({"role": "tool", "content": str(tool_result)})
except Exception as e:
tool_latency = (time.monotonic() - start_tool) * 1000
telemetry.track_tool_use(
tool_name=tool_call.name,
arguments=tool_call.arguments,
error_message=str(e),
latency_ms=tool_latency
)
raise
report = telemetry.generate_report()
print(f"Run complete: {report['total_steps']} steps, {report['total_tokens']} tokens")
return response.text
Rationale: The implementation separates event recording from file I/O, ensuring that the telemetry logic remains clean and testable. The generate_report method operates on the in-memory buffer, providing instant metrics without disk reads. Error handling in tool calls ensures that failures are captured with full context, including latency and arguments, which is essential for debugging tool-related issues.
Pitfall Guide
Implementing agent telemetry introduces several operational challenges. The following pitfalls highlight common mistakes and their resolutions based on production experience.
-
PII Leakage in Tool Outputs
- Explanation: Tool results may contain sensitive user data, credentials, or proprietary information. Logging these outputs verbatim creates compliance risks and security vulnerabilities.
- Fix: Implement a redaction layer before logging. Use regex patterns or schema-based filters to mask sensitive fields. Alternatively, log hashes of large outputs instead of raw content.
-
Blocking I/O on Critical Path
- Explanation: Synchronous file writes can introduce latency, especially on slow storage or under high load. This may degrade agent response times.
- Fix: For latency-sensitive applications, use asynchronous writes or a background thread pool for log flushing. Ensure the lock scope is minimized to reduce contention.
-
Unbounded File Growth
- Explanation: Long-running agents or high-throughput systems can generate massive log files, consuming disk space and complicating analysis.
- Fix: Implement log rotation based on file size or time intervals. Use tools like
logrotate or custom logic to archive and compress old logs. Monitor disk usage proactively.
-
Missing Run Metadata
- Explanation: Logs without context (e.g., agent version, environment, task type) are difficult to filter and correlate. This hampers cross-run analysis and debugging.
- Fix: Include a header record at the start of each log file with run-level metadata. Ensure metadata is propagated consistently across all events.
-
Token Count Drift
- Explanation: Failing to accurately track input and output tokens per inference step leads to incorrect cost calculations and budget overruns.
- Fix: Always capture token counts directly from the LLM API response. Validate totals against billing reports periodically. Use the telemetry summary to alert on anomalous token usage.
-
Concurrency Collisions
- Explanation: Multiple agent instances writing to the same file or using non-unique run IDs can cause data corruption or interleaved logs.
- Fix: Ensure each run has a globally unique identifier. Use file-level locking or separate files per run. Validate thread safety in the telemetry implementation.
-
Silent Log Failures
- Explanation: Disk full errors or permission issues can cause log writes to fail silently, resulting in incomplete audit trails.
- Fix: Implement error handling around file operations. Log warnings to stderr or a fallback mechanism if primary logging fails. Monitor log health in production.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-Throughput Agent | JSONL with async flush | Minimizes latency impact while preserving structure | Low (storage) |
| Compliance Audit | JSONL with encryption | Ensures data integrity and security for regulated workloads | Medium (encryption overhead) |
| Real-Time Dashboard | JSONL + stream to Kafka | Enables live monitoring without blocking agent execution | Medium (streaming infra) |
| Cost-Constrained Env | JSONL with compact serialization | Reduces storage costs while maintaining debug utility | Low |
| Multi-Agent System | Unique run IDs + metadata headers | Facilitates cross-run analysis and filtering | Low |
Configuration Template
JSONL Schema Definition:
{
"run_id": "string",
"seq": "integer",
"ts_epoch": "float",
"event_type": "inference | tool_use",
"model": "string (optional)",
"input_tokens": "integer (optional)",
"output_tokens": "integer (optional)",
"stop_reason": "string (optional)",
"latency_ms": "float",
"tool": "string (optional)",
"args": "object (optional)",
"result": "any (optional)",
"error": "string (optional)"
}
Log Rotation Config (logrotate example):
/path/to/logs/*.jsonl {
daily
rotate 7
compress
delaycompress
missingok
notifempty
create 0644 user group
}
Quick Start Guide
- Install Dependencies: Ensure
pathlib, json, time, and threading are available (standard library).
- Initialize Telemetry:
telemetry = AgentTelemetry(output_dir=Path("./logs"), run_identifier="run-001")
- Record Events:
telemetry.track_inference(model="claude-sonnet-4-6", input_tokens=100, output_tokens=50, stop_reason="end_turn")
telemetry.track_tool_use(tool="search", arguments={"q": "test"}, result={"hits": 1})
- Generate Report:
report = telemetry.generate_report()
print(report)
- Verify Output: Check the JSONL file for structured events and validate the report metrics.
This structured approach to agent telemetry provides a robust foundation for debugging, cost management, and operational excellence. By implementing per-step JSONL logging, teams gain full visibility into agent behavior, enabling faster resolution of issues and more reliable production deployments.