Workflow Observability: Monitoring, Alerting, and Debugging Agent Orchestration
Learn how to build observability into AI agent orchestration systems. Covers dashboard design, metric collection, alert rules, trace correlation, and debugging strategies for agent workflows.
Why Agent Workflows Need Specialized Observability
Traditional application monitoring tracks request latency, error rates, and throughput. AI agent workflows add unique challenges:
- Non-deterministic execution: The same input produces different step counts, different LLM calls, and different durations each run
- Long execution times: A workflow might run for minutes or hours, making real-time dashboards essential
- Cost visibility: Every LLM call has a dollar cost that must be tracked alongside performance metrics
- Quality signals: Beyond "did it succeed," you need to know "was the output good"
Effective observability for agent systems requires three pillars: metrics (what is happening), logs (why it happened), and traces (how it happened across steps).
Metric Collection
Define and collect the metrics that matter most for agent workflows.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
flowchart LR
APP(["Agent or API"])
SDK["OTel SDK<br/>GenAI conventions"]
COL["OTel Collector"]
subgraph BACKENDS["Backends"]
TR[("Traces<br/>Tempo or Honeycomb")]
MET[("Metrics<br/>Prometheus")]
LOG[("Logs<br/>Loki or ELK")]
end
DASH["Grafana plus alerts"]
PAGE(["Pager"])
APP --> SDK --> COL
COL --> TR
COL --> MET
COL --> LOG
TR --> DASH
MET --> DASH
LOG --> DASH
DASH --> PAGE
style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any
@dataclass
class WorkflowMetrics:
workflow_id: str
workflow_name: str
start_time: float = field(default_factory=time.time)
end_time: float | None = None
step_metrics: list[dict] = field(default_factory=list)
llm_calls: list[dict] = field(default_factory=list)
total_tokens: int = 0
total_cost_usd: float = 0.0
error_count: int = 0
retry_count: int = 0
@property
def duration_seconds(self) -> float | None:
if self.end_time is None:
return time.time() - self.start_time
return self.end_time - self.start_time
class MetricsCollector:
"""Collects and exposes workflow metrics."""
def __init__(self):
self._active_workflows: dict[str, WorkflowMetrics] = {}
self._completed: list[WorkflowMetrics] = []
self._counters: dict[str, int] = defaultdict(int)
def start_workflow(self, workflow_id: str, name: str) -> WorkflowMetrics:
metrics = WorkflowMetrics(
workflow_id=workflow_id,
workflow_name=name,
)
self._active_workflows[workflow_id] = metrics
self._counters["workflows_started"] += 1
return metrics
def record_step(
self,
workflow_id: str,
step_name: str,
duration_ms: float,
status: str,
metadata: dict | None = None,
):
metrics = self._active_workflows.get(workflow_id)
if not metrics:
return
metrics.step_metrics.append({
"step": step_name,
"duration_ms": duration_ms,
"status": status,
"timestamp": time.time(),
**(metadata or {}),
})
if status == "failed":
metrics.error_count += 1
if status == "retried":
metrics.retry_count += 1
def record_llm_call(
self,
workflow_id: str,
model: str,
input_tokens: int,
output_tokens: int,
duration_ms: float,
cost_usd: float,
):
metrics = self._active_workflows.get(workflow_id)
if not metrics:
return
metrics.llm_calls.append({
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"duration_ms": duration_ms,
"cost_usd": cost_usd,
"timestamp": time.time(),
})
metrics.total_tokens += input_tokens + output_tokens
metrics.total_cost_usd += cost_usd
def complete_workflow(self, workflow_id: str, status: str):
metrics = self._active_workflows.pop(workflow_id, None)
if metrics:
metrics.end_time = time.time()
self._completed.append(metrics)
self._counters[f"workflows_{status}"] += 1
def get_summary(self) -> dict:
return {
"active_workflows": len(self._active_workflows),
"counters": dict(self._counters),
"recent_completed": [
{
"id": m.workflow_id,
"name": m.workflow_name,
"duration_s": round(m.duration_seconds, 2),
"steps": len(m.step_metrics),
"tokens": m.total_tokens,
"cost_usd": round(m.total_cost_usd, 4),
"errors": m.error_count,
}
for m in self._completed[-20:]
],
}
Prometheus Integration
Export metrics in Prometheus format for Grafana dashboards.
from prometheus_client import Counter, Histogram, Gauge, Info
# Workflow-level metrics
workflow_started = Counter(
"agent_workflow_started_total",
"Total workflows started",
["workflow_name"],
)
workflow_completed = Counter(
"agent_workflow_completed_total",
"Total workflows completed",
["workflow_name", "status"],
)
workflow_duration = Histogram(
"agent_workflow_duration_seconds",
"Workflow execution duration",
["workflow_name"],
buckets=[1, 5, 10, 30, 60, 120, 300, 600],
)
active_workflows = Gauge(
"agent_active_workflows",
"Currently running workflows",
["workflow_name"],
)
# Step-level metrics
step_duration = Histogram(
"agent_step_duration_seconds",
"Individual step duration",
["workflow_name", "step_name"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
step_errors = Counter(
"agent_step_errors_total",
"Step execution errors",
["workflow_name", "step_name", "error_type"],
)
# LLM-specific metrics
llm_call_duration = Histogram(
"agent_llm_call_duration_seconds",
"LLM API call duration",
["model"],
buckets=[0.5, 1, 2, 5, 10, 30],
)
llm_tokens_used = Counter(
"agent_llm_tokens_total",
"Total tokens consumed",
["model", "direction"], # direction: input or output
)
llm_cost = Counter(
"agent_llm_cost_usd_total",
"Total LLM cost in USD",
["model"],
)
Alert Rules
Define alerts that catch real problems without creating noise.
alert_rules = {
"high_failure_rate": {
"expr": (
"rate(agent_workflow_completed_total{status='failed'}[5m]) / "
"rate(agent_workflow_started_total[5m]) > 0.1"
),
"for": "5m",
"severity": "critical",
"summary": "More than 10% of agent workflows are failing",
},
"workflow_stuck": {
"expr": (
"time() - agent_workflow_last_step_timestamp > 600"
),
"for": "1m",
"severity": "warning",
"summary": "Agent workflow has not progressed in 10 minutes",
},
"llm_latency_spike": {
"expr": (
"histogram_quantile(0.95, "
"rate(agent_llm_call_duration_seconds_bucket[5m])) > 15"
),
"for": "3m",
"severity": "warning",
"summary": "P95 LLM call latency exceeds 15 seconds",
},
"cost_spike": {
"expr": (
"rate(agent_llm_cost_usd_total[1h]) > 10"
),
"for": "5m",
"severity": "critical",
"summary": "LLM spending exceeds $10/hour",
},
}
Trace Correlation
Link individual steps across a workflow execution using trace IDs. This lets you follow the full execution path in your logging system.
import uuid
import logging
import contextvars
trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
"trace_id", default=""
)
class TraceContext:
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.trace_id = str(uuid.uuid4())
self.span_stack: list[str] = []
def start_span(self, step_name: str) -> str:
span_id = str(uuid.uuid4())[:8]
self.span_stack.append(span_id)
trace_id_var.set(self.trace_id)
return span_id
def end_span(self):
if self.span_stack:
self.span_stack.pop()
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log_step(
self,
level: str,
message: str,
trace: TraceContext,
step_name: str,
**extra,
):
self.logger.log(
getattr(logging, level.upper()),
message,
extra={
"trace_id": trace.trace_id,
"workflow_id": trace.workflow_id,
"step_name": step_name,
"span_id": (
trace.span_stack[-1] if trace.span_stack else None
),
**extra,
},
)
# Usage
logger = StructuredLogger("agent")
trace = TraceContext(workflow_id="wf-123")
span = trace.start_span("analyze")
logger.log_step(
"info",
"Starting analysis step",
trace,
"analyze",
input_length=1500,
)
Debugging Failed Workflows
When a workflow fails, you need to reconstruct what happened. Build a debugging utility that pulls together metrics, logs, and state.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
class WorkflowDebugger:
def __init__(self, store, metrics_collector, log_store):
self.store = store
self.metrics = metrics_collector
self.logs = log_store
async def investigate(self, workflow_id: str) -> dict:
workflow = await self.store.load(workflow_id)
logs = await self.logs.query(
f'workflow_id="{workflow_id}"',
limit=100,
)
failed_steps = [
s for s in workflow.steps
if s.status == "failed"
]
return {
"workflow": {
"id": workflow.id,
"status": workflow.status,
"version": workflow.version,
"started": workflow.created_at.isoformat(),
},
"failed_steps": [
{
"name": s.name,
"error": s.error,
"attempts": s.attempts,
"last_attempt": s.completed_at.isoformat(),
}
for s in failed_steps
],
"recent_logs": logs,
"context_snapshot": workflow.context,
}
FAQ
What is the single most important metric for agent workflows?
The step failure rate by step name. This tells you which specific step is causing problems and at what rate. Aggregate workflow failure rates hide whether the issue is systemic (all steps failing) or localized (one flaky API integration). Once you know the failing step, you can look at its error logs and retry behavior.
How do I avoid alert fatigue with AI agent monitoring?
Set alerts on rates and percentiles, not individual failures. A single failed LLM call is expected. A 10% failure rate sustained for 5 minutes is a real problem. Use the for clause in Prometheus alert rules to require sustained anomalies before firing. Also, separate informational alerts (Slack notifications) from actionable alerts (PagerDuty pages).
Should I log full LLM prompts and responses?
Log them in development and staging for debugging. In production, log truncated versions (first 200 characters) or hashes. Full prompts and responses can contain sensitive user data and consume enormous storage. Use sampling — log full content for 1% of executions — to maintain debugging capability without the storage cost.
#Observability #Monitoring #Alerting #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.