Skip to content
Learn Agentic AI
Learn Agentic AI13 min read24 views

Building Custom Trace Processors for Agent Observability

Build custom trace processors and exporters for the OpenAI Agents SDK to ship agent telemetry to Elasticsearch, Datadog, or any backend using TraceProvider, BatchTraceProcessor, and BackendSpanExporter.

Why Custom Trace Processors Matter

The OpenAI Agents SDK ships with a default trace processor that sends spans to the OpenAI dashboard. For development and small-scale deployments, this is sufficient. But production systems need more — you need agent telemetry alongside your application metrics in Elasticsearch, Datadog, Grafana, or your own analytics backend. Custom trace processors let you intercept every span the SDK generates and route it wherever you need.

This post covers the full architecture: implementing a custom TracingProcessor, building a BackendSpanExporter, wiring everything together with TraceProvider, and optimizing throughput with BatchTraceProcessor.

The Tracing Architecture

The Agents SDK tracing pipeline has four components:

flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK<br/>GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces<br/>Tempo or Honeycomb")]
        MET[("Metrics<br/>Prometheus")]
        LOG[("Logs<br/>Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
  1. Spans — Individual units of work (agent invocation, tool call, generation, handoff)
  2. TracingProcessor — Receives span lifecycle events (start, end) and decides what to do with them
  3. Exporter — Ships completed spans to an external backend
  4. TraceProvider — The global registry that connects processors to the SDK

The default setup sends everything to OpenAI. To add your own backend, you implement a custom processor and exporter, then register them with the TraceProvider.

Implementing a Custom TracingProcessor

The TracingProcessor protocol defines the interface your processor must implement:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
from agents.tracing import (
    TracingProcessor,
    Span,
    Trace,
)
from typing import Any
import json
import time

class ElasticsearchTracingProcessor(TracingProcessor):
    """Processor that collects spans and exports them to Elasticsearch."""

    def __init__(self, exporter: "ElasticsearchSpanExporter"):
        self._exporter = exporter
        self._active_traces: dict[str, dict[str, Any]] = {}

    def on_trace_start(self, trace: Trace) -> None:
        """Called when a new trace begins."""
        self._active_traces[trace.trace_id] = {
            "trace_id": trace.trace_id,
            "workflow_name": trace.workflow_name,
            "group_id": trace.group_id,
            "metadata": trace.metadata,
            "started_at": time.time(),
            "spans": [],
        }

    def on_trace_end(self, trace: Trace) -> None:
        """Called when a trace completes. Flush all collected spans."""
        trace_data = self._active_traces.pop(trace.trace_id, None)
        if trace_data:
            trace_data["ended_at"] = time.time()
            trace_data["duration_ms"] = (
                (trace_data["ended_at"] - trace_data["started_at"]) * 1000
            )
            self._exporter.export_trace(trace_data)

    def on_span_start(self, span: Span) -> None:
        """Called when a span begins within an active trace."""
        trace_data = self._active_traces.get(span.trace_id)
        if trace_data:
            trace_data["spans"].append({
                "span_id": span.span_id,
                "parent_id": span.parent_id,
                "span_type": span.span_type,
                "name": span.name,
                "started_at": time.time(),
            })

    def on_span_end(self, span: Span) -> None:
        """Called when a span completes."""
        trace_data = self._active_traces.get(span.trace_id)
        if trace_data:
            for s in trace_data["spans"]:
                if s["span_id"] == span.span_id:
                    s["ended_at"] = time.time()
                    s["duration_ms"] = (s["ended_at"] - s["started_at"]) * 1000
                    s["status"] = span.status
                    s["attributes"] = span.attributes
                    break

    def shutdown(self) -> None:
        """Flush any remaining data on shutdown."""
        for trace_id in list(self._active_traces.keys()):
            trace_data = self._active_traces.pop(trace_id)
            self._exporter.export_trace(trace_data)
        self._exporter.flush()

    def force_flush(self) -> None:
        """Force export of all buffered data."""
        self._exporter.flush()

The four lifecycle methods — on_trace_start, on_trace_end, on_span_start, and on_span_end — give you access to every event in the tracing pipeline.

Building the Elasticsearch Exporter

The exporter handles the actual transport of span data to your backend:

from elasticsearch import Elasticsearch, helpers
from typing import Any
import threading
import queue
import logging

logger = logging.getLogger(__name__)

class ElasticsearchSpanExporter:
    """Exports agent traces to Elasticsearch."""

    def __init__(
        self,
        es_url: str = "http://localhost:9200",
        index_prefix: str = "agent-traces",
        api_key: str | None = None,
        batch_size: int = 50,
        flush_interval: float = 5.0,
    ):
        self._client = Elasticsearch(
            es_url,
            api_key=api_key,
        )
        self._index_prefix = index_prefix
        self._batch_size = batch_size
        self._buffer: queue.Queue[dict[str, Any]] = queue.Queue()
        self._flush_interval = flush_interval
        self._shutdown_event = threading.Event()

        # Background flush thread
        self._flush_thread = threading.Thread(
            target=self._periodic_flush, daemon=True
        )
        self._flush_thread.start()

    def export_trace(self, trace_data: dict[str, Any]) -> None:
        """Queue a trace for export."""
        # Index the trace summary
        self._buffer.put({
            "_index": f"{self._index_prefix}-traces",
            "_id": trace_data["trace_id"],
            "_source": {
                "trace_id": trace_data["trace_id"],
                "workflow_name": trace_data["workflow_name"],
                "group_id": trace_data.get("group_id"),
                "metadata": trace_data.get("metadata", {}),
                "duration_ms": trace_data.get("duration_ms", 0),
                "span_count": len(trace_data.get("spans", [])),
                "timestamp": trace_data["started_at"],
            },
        })

        # Index each span separately for granular querying
        for span in trace_data.get("spans", []):
            self._buffer.put({
                "_index": f"{self._index_prefix}-spans",
                "_id": span["span_id"],
                "_source": {
                    "trace_id": trace_data["trace_id"],
                    "span_id": span["span_id"],
                    "parent_id": span.get("parent_id"),
                    "span_type": span["span_type"],
                    "name": span.get("name"),
                    "duration_ms": span.get("duration_ms", 0),
                    "status": span.get("status"),
                    "attributes": span.get("attributes", {}),
                    "timestamp": span["started_at"],
                },
            })

        if self._buffer.qsize() >= self._batch_size:
            self.flush()

    def flush(self) -> None:
        """Flush buffered documents to Elasticsearch."""
        actions = []
        while not self._buffer.empty():
            try:
                actions.append(self._buffer.get_nowait())
            except queue.Empty:
                break

        if actions:
            try:
                helpers.bulk(self._client, actions)
                logger.info(f"Exported {len(actions)} documents to Elasticsearch")
            except Exception as e:
                logger.error(f"Failed to export to Elasticsearch: {e}")
                # Re-queue failed documents
                for action in actions:
                    self._buffer.put(action)

    def _periodic_flush(self) -> None:
        """Periodically flush the buffer."""
        while not self._shutdown_event.is_set():
            self._shutdown_event.wait(self._flush_interval)
            if not self._buffer.empty():
                self.flush()

    def close(self) -> None:
        """Shutdown the exporter."""
        self._shutdown_event.set()
        self._flush_thread.join(timeout=10)
        self.flush()
        self._client.close()

Registering with TraceProvider

With both the processor and exporter built, register them with the global TraceProvider:

from agents.tracing import set_trace_processors
import os

# Create the exporter
es_exporter = ElasticsearchSpanExporter(
    es_url=os.environ["ELASTICSEARCH_URL"],
    api_key=os.environ.get("ELASTICSEARCH_API_KEY"),
    index_prefix="agent-traces",
    batch_size=100,
    flush_interval=3.0,
)

# Create the processor
es_processor = ElasticsearchTracingProcessor(exporter=es_exporter)

# Register — this adds your processor alongside the default OpenAI processor
set_trace_processors([es_processor])

Now every trace produced by any Runner.run() call will flow through both the default OpenAI processor and your Elasticsearch processor.

Building a BatchTraceProcessor for High Throughput

For high-volume systems, processing spans one at a time is inefficient. A batch processor collects spans and exports them in bulk:

import threading
import time
from typing import Any

class BatchTraceProcessor(TracingProcessor):
    """Batches spans before exporting for higher throughput."""

    def __init__(
        self,
        exporter: ElasticsearchSpanExporter,
        max_batch_size: int = 200,
        flush_interval: float = 2.0,
        max_queue_size: int = 10000,
    ):
        self._exporter = exporter
        self._max_batch_size = max_batch_size
        self._flush_interval = flush_interval
        self._span_buffer: list[dict[str, Any]] = []
        self._lock = threading.Lock()
        self._shutdown = threading.Event()

        self._flush_thread = threading.Thread(
            target=self._auto_flush, daemon=True
        )
        self._flush_thread.start()

    def on_trace_start(self, trace: Trace) -> None:
        pass  # We handle data at the span level

    def on_trace_end(self, trace: Trace) -> None:
        self._maybe_flush()

    def on_span_start(self, span: Span) -> None:
        pass  # Buffer on end, not start

    def on_span_end(self, span: Span) -> None:
        with self._lock:
            self._span_buffer.append({
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_id": span.parent_id,
                "span_type": span.span_type,
                "name": span.name,
                "duration_ms": getattr(span, "duration_ms", 0),
                "status": span.status,
                "attributes": span.attributes,
                "timestamp": time.time(),
            })
        self._maybe_flush()

    def _maybe_flush(self) -> None:
        with self._lock:
            if len(self._span_buffer) >= self._max_batch_size:
                batch = self._span_buffer[:]
                self._span_buffer.clear()
        if batch:
            self._export_batch(batch)

    def _export_batch(self, batch: list[dict]) -> None:
        for span_data in batch:
            self._exporter.export_trace({"spans": [span_data], "trace_id": span_data["trace_id"]})
        self._exporter.flush()

    def _auto_flush(self) -> None:
        while not self._shutdown.is_set():
            self._shutdown.wait(self._flush_interval)
            self._maybe_flush()

    def shutdown(self) -> None:
        self._shutdown.set()
        self._flush_thread.join(timeout=10)
        self._maybe_flush()

    def force_flush(self) -> None:
        self._maybe_flush()

Creating Kibana Dashboards

With agent traces in Elasticsearch, you can build powerful Kibana dashboards:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

{
  "agent_performance_dashboard": {
    "panels": [
      {
        "title": "Average Agent Run Duration (ms)",
        "type": "metric",
        "query": {
          "aggs": {
            "avg_duration": {
              "avg": { "field": "duration_ms" }
            }
          }
        }
      },
      {
        "title": "Tool Call Latency Distribution",
        "type": "histogram",
        "query": {
          "bool": {
            "filter": [
              { "term": { "span_type": "tool_call" } }
            ]
          }
        }
      },
      {
        "title": "Handoff Frequency by Agent Pair",
        "type": "heatmap",
        "query": {
          "bool": {
            "filter": [
              { "term": { "span_type": "handoff" } }
            ]
          }
        }
      }
    ]
  }
}

Practical Observability Patterns

Here are the patterns that matter most in production agent observability:

  1. Latency percentiles — Track p50, p95, and p99 for each agent and tool call. A single slow tool can dominate user-perceived latency.

  2. Handoff heatmaps — Visualize which agents hand off to which others. Unexpected handoff patterns reveal confusion in the triage logic.

  3. Token consumption by agent — Track input and output tokens per agent to identify which agents are the most expensive.

  4. Error rate by span type — Separate tool call errors from generation errors from guardrail blocks. Each requires a different remediation.

  5. Trace depth monitoring — If a trace has more than 10-15 agent spans, the orchestration logic likely needs simplification.

Custom trace processors transform agent traces from a debugging tool into a full observability platform. Start with Elasticsearch for flexible querying, add Kibana dashboards for visualization, and set up alerting rules for latency and error rate thresholds. The investment pays off the first time you diagnose a production issue in minutes instead of hours.

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Infrastructure

Monitoring WebSocket Health: Heartbeats and Prometheus in 2026

How to actually observe a WebSocket fleet: ping/pong heartbeats, Prometheus metrics that matter, dead-man switches, and the alerts that fire before customers notice.

Agentic AI

Browser Agents with LangGraph + Playwright: Visual Evaluation Pipelines That Don't Lie

Build a browser agent with LangGraph and Playwright that does multi-step web tasks, then ground-truth its work with visual diffs and DOM-based evaluators.

Agentic AI

The Agent Evaluation Stack in 2026: From Trace to Eval Score

How the modern agent eval stack actually flows: instrument, trace, dataset, evaluator, score, CI gate. The full pipeline that keeps agents from regressing.

Agentic AI

OpenAI Computer-Use Agents (CUA) in Production: Build + Evaluate a Real Workflow (2026)

Build a working computer-use agent with the OpenAI Computer Use tool — clicks, types, scrolls a real browser — then evaluate task success on a benchmark suite.

AI Voice Agents

MOS Call Quality Scoring for AI Voice Operations in 2026: Beyond 4.2

MOS 4.3+ is the band where AI voice feels human. Drop below 3.6 and conversations break. Here is how to measure, improve, and alert on MOS in production AI voice using G.711, Opus, and the underlying packet loss / jitter / latency math.

Funding & Industry

OpenAI revenue run-rate — April 2026 read — April 2026 update

OpenAI's April 2026 reported revenue run-rate cleared $13B annualized, on continued ChatGPT growth, agentic Operator monetization, and enterprise API expansion.