Building Custom Trace Processors for Agent Observability
Build custom trace processors and exporters for the OpenAI Agents SDK to ship agent telemetry to Elasticsearch, Datadog, or any backend using TraceProvider, BatchTraceProcessor, and BackendSpanExporter.
Why Custom Trace Processors Matter
The OpenAI Agents SDK ships with a default trace processor that sends spans to the OpenAI dashboard. For development and small-scale deployments, this is sufficient. But production systems need more — you need agent telemetry alongside your application metrics in Elasticsearch, Datadog, Grafana, or your own analytics backend. Custom trace processors let you intercept every span the SDK generates and route it wherever you need.
This post covers the full architecture: implementing a custom TracingProcessor, building a BackendSpanExporter, wiring everything together with TraceProvider, and optimizing throughput with BatchTraceProcessor.
The Tracing Architecture
The Agents SDK tracing pipeline has four components:
flowchart LR
APP(["Agent or API"])
SDK["OTel SDK<br/>GenAI conventions"]
COL["OTel Collector"]
subgraph BACKENDS["Backends"]
TR[("Traces<br/>Tempo or Honeycomb")]
MET[("Metrics<br/>Prometheus")]
LOG[("Logs<br/>Loki or ELK")]
end
DASH["Grafana plus alerts"]
PAGE(["Pager"])
APP --> SDK --> COL
COL --> TR
COL --> MET
COL --> LOG
TR --> DASH
MET --> DASH
LOG --> DASH
DASH --> PAGE
style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
- Spans — Individual units of work (agent invocation, tool call, generation, handoff)
- TracingProcessor — Receives span lifecycle events (start, end) and decides what to do with them
- Exporter — Ships completed spans to an external backend
- TraceProvider — The global registry that connects processors to the SDK
The default setup sends everything to OpenAI. To add your own backend, you implement a custom processor and exporter, then register them with the TraceProvider.
Implementing a Custom TracingProcessor
The TracingProcessor protocol defines the interface your processor must implement:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
from agents.tracing import (
TracingProcessor,
Span,
Trace,
)
from typing import Any
import json
import time
class ElasticsearchTracingProcessor(TracingProcessor):
"""Processor that collects spans and exports them to Elasticsearch."""
def __init__(self, exporter: "ElasticsearchSpanExporter"):
self._exporter = exporter
self._active_traces: dict[str, dict[str, Any]] = {}
def on_trace_start(self, trace: Trace) -> None:
"""Called when a new trace begins."""
self._active_traces[trace.trace_id] = {
"trace_id": trace.trace_id,
"workflow_name": trace.workflow_name,
"group_id": trace.group_id,
"metadata": trace.metadata,
"started_at": time.time(),
"spans": [],
}
def on_trace_end(self, trace: Trace) -> None:
"""Called when a trace completes. Flush all collected spans."""
trace_data = self._active_traces.pop(trace.trace_id, None)
if trace_data:
trace_data["ended_at"] = time.time()
trace_data["duration_ms"] = (
(trace_data["ended_at"] - trace_data["started_at"]) * 1000
)
self._exporter.export_trace(trace_data)
def on_span_start(self, span: Span) -> None:
"""Called when a span begins within an active trace."""
trace_data = self._active_traces.get(span.trace_id)
if trace_data:
trace_data["spans"].append({
"span_id": span.span_id,
"parent_id": span.parent_id,
"span_type": span.span_type,
"name": span.name,
"started_at": time.time(),
})
def on_span_end(self, span: Span) -> None:
"""Called when a span completes."""
trace_data = self._active_traces.get(span.trace_id)
if trace_data:
for s in trace_data["spans"]:
if s["span_id"] == span.span_id:
s["ended_at"] = time.time()
s["duration_ms"] = (s["ended_at"] - s["started_at"]) * 1000
s["status"] = span.status
s["attributes"] = span.attributes
break
def shutdown(self) -> None:
"""Flush any remaining data on shutdown."""
for trace_id in list(self._active_traces.keys()):
trace_data = self._active_traces.pop(trace_id)
self._exporter.export_trace(trace_data)
self._exporter.flush()
def force_flush(self) -> None:
"""Force export of all buffered data."""
self._exporter.flush()
The four lifecycle methods — on_trace_start, on_trace_end, on_span_start, and on_span_end — give you access to every event in the tracing pipeline.
Building the Elasticsearch Exporter
The exporter handles the actual transport of span data to your backend:
from elasticsearch import Elasticsearch, helpers
from typing import Any
import threading
import queue
import logging
logger = logging.getLogger(__name__)
class ElasticsearchSpanExporter:
"""Exports agent traces to Elasticsearch."""
def __init__(
self,
es_url: str = "http://localhost:9200",
index_prefix: str = "agent-traces",
api_key: str | None = None,
batch_size: int = 50,
flush_interval: float = 5.0,
):
self._client = Elasticsearch(
es_url,
api_key=api_key,
)
self._index_prefix = index_prefix
self._batch_size = batch_size
self._buffer: queue.Queue[dict[str, Any]] = queue.Queue()
self._flush_interval = flush_interval
self._shutdown_event = threading.Event()
# Background flush thread
self._flush_thread = threading.Thread(
target=self._periodic_flush, daemon=True
)
self._flush_thread.start()
def export_trace(self, trace_data: dict[str, Any]) -> None:
"""Queue a trace for export."""
# Index the trace summary
self._buffer.put({
"_index": f"{self._index_prefix}-traces",
"_id": trace_data["trace_id"],
"_source": {
"trace_id": trace_data["trace_id"],
"workflow_name": trace_data["workflow_name"],
"group_id": trace_data.get("group_id"),
"metadata": trace_data.get("metadata", {}),
"duration_ms": trace_data.get("duration_ms", 0),
"span_count": len(trace_data.get("spans", [])),
"timestamp": trace_data["started_at"],
},
})
# Index each span separately for granular querying
for span in trace_data.get("spans", []):
self._buffer.put({
"_index": f"{self._index_prefix}-spans",
"_id": span["span_id"],
"_source": {
"trace_id": trace_data["trace_id"],
"span_id": span["span_id"],
"parent_id": span.get("parent_id"),
"span_type": span["span_type"],
"name": span.get("name"),
"duration_ms": span.get("duration_ms", 0),
"status": span.get("status"),
"attributes": span.get("attributes", {}),
"timestamp": span["started_at"],
},
})
if self._buffer.qsize() >= self._batch_size:
self.flush()
def flush(self) -> None:
"""Flush buffered documents to Elasticsearch."""
actions = []
while not self._buffer.empty():
try:
actions.append(self._buffer.get_nowait())
except queue.Empty:
break
if actions:
try:
helpers.bulk(self._client, actions)
logger.info(f"Exported {len(actions)} documents to Elasticsearch")
except Exception as e:
logger.error(f"Failed to export to Elasticsearch: {e}")
# Re-queue failed documents
for action in actions:
self._buffer.put(action)
def _periodic_flush(self) -> None:
"""Periodically flush the buffer."""
while not self._shutdown_event.is_set():
self._shutdown_event.wait(self._flush_interval)
if not self._buffer.empty():
self.flush()
def close(self) -> None:
"""Shutdown the exporter."""
self._shutdown_event.set()
self._flush_thread.join(timeout=10)
self.flush()
self._client.close()
Registering with TraceProvider
With both the processor and exporter built, register them with the global TraceProvider:
from agents.tracing import set_trace_processors
import os
# Create the exporter
es_exporter = ElasticsearchSpanExporter(
es_url=os.environ["ELASTICSEARCH_URL"],
api_key=os.environ.get("ELASTICSEARCH_API_KEY"),
index_prefix="agent-traces",
batch_size=100,
flush_interval=3.0,
)
# Create the processor
es_processor = ElasticsearchTracingProcessor(exporter=es_exporter)
# Register — this adds your processor alongside the default OpenAI processor
set_trace_processors([es_processor])
Now every trace produced by any Runner.run() call will flow through both the default OpenAI processor and your Elasticsearch processor.
Building a BatchTraceProcessor for High Throughput
For high-volume systems, processing spans one at a time is inefficient. A batch processor collects spans and exports them in bulk:
import threading
import time
from typing import Any
class BatchTraceProcessor(TracingProcessor):
"""Batches spans before exporting for higher throughput."""
def __init__(
self,
exporter: ElasticsearchSpanExporter,
max_batch_size: int = 200,
flush_interval: float = 2.0,
max_queue_size: int = 10000,
):
self._exporter = exporter
self._max_batch_size = max_batch_size
self._flush_interval = flush_interval
self._span_buffer: list[dict[str, Any]] = []
self._lock = threading.Lock()
self._shutdown = threading.Event()
self._flush_thread = threading.Thread(
target=self._auto_flush, daemon=True
)
self._flush_thread.start()
def on_trace_start(self, trace: Trace) -> None:
pass # We handle data at the span level
def on_trace_end(self, trace: Trace) -> None:
self._maybe_flush()
def on_span_start(self, span: Span) -> None:
pass # Buffer on end, not start
def on_span_end(self, span: Span) -> None:
with self._lock:
self._span_buffer.append({
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_id": span.parent_id,
"span_type": span.span_type,
"name": span.name,
"duration_ms": getattr(span, "duration_ms", 0),
"status": span.status,
"attributes": span.attributes,
"timestamp": time.time(),
})
self._maybe_flush()
def _maybe_flush(self) -> None:
with self._lock:
if len(self._span_buffer) >= self._max_batch_size:
batch = self._span_buffer[:]
self._span_buffer.clear()
if batch:
self._export_batch(batch)
def _export_batch(self, batch: list[dict]) -> None:
for span_data in batch:
self._exporter.export_trace({"spans": [span_data], "trace_id": span_data["trace_id"]})
self._exporter.flush()
def _auto_flush(self) -> None:
while not self._shutdown.is_set():
self._shutdown.wait(self._flush_interval)
self._maybe_flush()
def shutdown(self) -> None:
self._shutdown.set()
self._flush_thread.join(timeout=10)
self._maybe_flush()
def force_flush(self) -> None:
self._maybe_flush()
Creating Kibana Dashboards
With agent traces in Elasticsearch, you can build powerful Kibana dashboards:
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
{
"agent_performance_dashboard": {
"panels": [
{
"title": "Average Agent Run Duration (ms)",
"type": "metric",
"query": {
"aggs": {
"avg_duration": {
"avg": { "field": "duration_ms" }
}
}
}
},
{
"title": "Tool Call Latency Distribution",
"type": "histogram",
"query": {
"bool": {
"filter": [
{ "term": { "span_type": "tool_call" } }
]
}
}
},
{
"title": "Handoff Frequency by Agent Pair",
"type": "heatmap",
"query": {
"bool": {
"filter": [
{ "term": { "span_type": "handoff" } }
]
}
}
}
]
}
}
Practical Observability Patterns
Here are the patterns that matter most in production agent observability:
Latency percentiles — Track p50, p95, and p99 for each agent and tool call. A single slow tool can dominate user-perceived latency.
Handoff heatmaps — Visualize which agents hand off to which others. Unexpected handoff patterns reveal confusion in the triage logic.
Token consumption by agent — Track input and output tokens per agent to identify which agents are the most expensive.
Error rate by span type — Separate tool call errors from generation errors from guardrail blocks. Each requires a different remediation.
Trace depth monitoring — If a trace has more than 10-15 agent spans, the orchestration logic likely needs simplification.
Custom trace processors transform agent traces from a debugging tool into a full observability platform. Start with Elasticsearch for flexible querying, add Kibana dashboards for visualization, and set up alerting rules for latency and error rate thresholds. The investment pays off the first time you diagnose a production issue in minutes instead of hours.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.