Skip to content
Learn Agentic AI
Learn Agentic AI14 min read13 views

Workflow Observability: Monitoring, Alerting, and Debugging Agent Orchestration

Learn how to build observability into AI agent orchestration systems. Covers dashboard design, metric collection, alert rules, trace correlation, and debugging strategies for agent workflows.

Why Agent Workflows Need Specialized Observability

Traditional application monitoring tracks request latency, error rates, and throughput. AI agent workflows add unique challenges:

  • Non-deterministic execution: The same input produces different step counts, different LLM calls, and different durations each run
  • Long execution times: A workflow might run for minutes or hours, making real-time dashboards essential
  • Cost visibility: Every LLM call has a dollar cost that must be tracked alongside performance metrics
  • Quality signals: Beyond "did it succeed," you need to know "was the output good"

Effective observability for agent systems requires three pillars: metrics (what is happening), logs (why it happened), and traces (how it happened across steps).

Metric Collection

Define and collect the metrics that matter most for agent workflows.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK<br/>GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces<br/>Tempo or Honeycomb")]
        MET[("Metrics<br/>Prometheus")]
        LOG[("Logs<br/>Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any

@dataclass
class WorkflowMetrics:
    workflow_id: str
    workflow_name: str
    start_time: float = field(default_factory=time.time)
    end_time: float | None = None
    step_metrics: list[dict] = field(default_factory=list)
    llm_calls: list[dict] = field(default_factory=list)
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    error_count: int = 0
    retry_count: int = 0

    @property
    def duration_seconds(self) -> float | None:
        if self.end_time is None:
            return time.time() - self.start_time
        return self.end_time - self.start_time

class MetricsCollector:
    """Collects and exposes workflow metrics."""

    def __init__(self):
        self._active_workflows: dict[str, WorkflowMetrics] = {}
        self._completed: list[WorkflowMetrics] = []
        self._counters: dict[str, int] = defaultdict(int)

    def start_workflow(self, workflow_id: str, name: str) -> WorkflowMetrics:
        metrics = WorkflowMetrics(
            workflow_id=workflow_id,
            workflow_name=name,
        )
        self._active_workflows[workflow_id] = metrics
        self._counters["workflows_started"] += 1
        return metrics

    def record_step(
        self,
        workflow_id: str,
        step_name: str,
        duration_ms: float,
        status: str,
        metadata: dict | None = None,
    ):
        metrics = self._active_workflows.get(workflow_id)
        if not metrics:
            return
        metrics.step_metrics.append({
            "step": step_name,
            "duration_ms": duration_ms,
            "status": status,
            "timestamp": time.time(),
            **(metadata or {}),
        })
        if status == "failed":
            metrics.error_count += 1
        if status == "retried":
            metrics.retry_count += 1

    def record_llm_call(
        self,
        workflow_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        duration_ms: float,
        cost_usd: float,
    ):
        metrics = self._active_workflows.get(workflow_id)
        if not metrics:
            return
        metrics.llm_calls.append({
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "duration_ms": duration_ms,
            "cost_usd": cost_usd,
            "timestamp": time.time(),
        })
        metrics.total_tokens += input_tokens + output_tokens
        metrics.total_cost_usd += cost_usd

    def complete_workflow(self, workflow_id: str, status: str):
        metrics = self._active_workflows.pop(workflow_id, None)
        if metrics:
            metrics.end_time = time.time()
            self._completed.append(metrics)
            self._counters[f"workflows_{status}"] += 1

    def get_summary(self) -> dict:
        return {
            "active_workflows": len(self._active_workflows),
            "counters": dict(self._counters),
            "recent_completed": [
                {
                    "id": m.workflow_id,
                    "name": m.workflow_name,
                    "duration_s": round(m.duration_seconds, 2),
                    "steps": len(m.step_metrics),
                    "tokens": m.total_tokens,
                    "cost_usd": round(m.total_cost_usd, 4),
                    "errors": m.error_count,
                }
                for m in self._completed[-20:]
            ],
        }

Prometheus Integration

Export metrics in Prometheus format for Grafana dashboards.

from prometheus_client import Counter, Histogram, Gauge, Info

# Workflow-level metrics
workflow_started = Counter(
    "agent_workflow_started_total",
    "Total workflows started",
    ["workflow_name"],
)
workflow_completed = Counter(
    "agent_workflow_completed_total",
    "Total workflows completed",
    ["workflow_name", "status"],
)
workflow_duration = Histogram(
    "agent_workflow_duration_seconds",
    "Workflow execution duration",
    ["workflow_name"],
    buckets=[1, 5, 10, 30, 60, 120, 300, 600],
)
active_workflows = Gauge(
    "agent_active_workflows",
    "Currently running workflows",
    ["workflow_name"],
)

# Step-level metrics
step_duration = Histogram(
    "agent_step_duration_seconds",
    "Individual step duration",
    ["workflow_name", "step_name"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
step_errors = Counter(
    "agent_step_errors_total",
    "Step execution errors",
    ["workflow_name", "step_name", "error_type"],
)

# LLM-specific metrics
llm_call_duration = Histogram(
    "agent_llm_call_duration_seconds",
    "LLM API call duration",
    ["model"],
    buckets=[0.5, 1, 2, 5, 10, 30],
)
llm_tokens_used = Counter(
    "agent_llm_tokens_total",
    "Total tokens consumed",
    ["model", "direction"],  # direction: input or output
)
llm_cost = Counter(
    "agent_llm_cost_usd_total",
    "Total LLM cost in USD",
    ["model"],
)

Alert Rules

Define alerts that catch real problems without creating noise.

alert_rules = {
    "high_failure_rate": {
        "expr": (
            "rate(agent_workflow_completed_total{status='failed'}[5m]) / "
            "rate(agent_workflow_started_total[5m]) > 0.1"
        ),
        "for": "5m",
        "severity": "critical",
        "summary": "More than 10% of agent workflows are failing",
    },
    "workflow_stuck": {
        "expr": (
            "time() - agent_workflow_last_step_timestamp > 600"
        ),
        "for": "1m",
        "severity": "warning",
        "summary": "Agent workflow has not progressed in 10 minutes",
    },
    "llm_latency_spike": {
        "expr": (
            "histogram_quantile(0.95, "
            "rate(agent_llm_call_duration_seconds_bucket[5m])) > 15"
        ),
        "for": "3m",
        "severity": "warning",
        "summary": "P95 LLM call latency exceeds 15 seconds",
    },
    "cost_spike": {
        "expr": (
            "rate(agent_llm_cost_usd_total[1h]) > 10"
        ),
        "for": "5m",
        "severity": "critical",
        "summary": "LLM spending exceeds $10/hour",
    },
}

Trace Correlation

Link individual steps across a workflow execution using trace IDs. This lets you follow the full execution path in your logging system.

import uuid
import logging
import contextvars

trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
    "trace_id", default=""
)

class TraceContext:
    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.trace_id = str(uuid.uuid4())
        self.span_stack: list[str] = []

    def start_span(self, step_name: str) -> str:
        span_id = str(uuid.uuid4())[:8]
        self.span_stack.append(span_id)
        trace_id_var.set(self.trace_id)
        return span_id

    def end_span(self):
        if self.span_stack:
            self.span_stack.pop()

class StructuredLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)

    def log_step(
        self,
        level: str,
        message: str,
        trace: TraceContext,
        step_name: str,
        **extra,
    ):
        self.logger.log(
            getattr(logging, level.upper()),
            message,
            extra={
                "trace_id": trace.trace_id,
                "workflow_id": trace.workflow_id,
                "step_name": step_name,
                "span_id": (
                    trace.span_stack[-1] if trace.span_stack else None
                ),
                **extra,
            },
        )

# Usage
logger = StructuredLogger("agent")
trace = TraceContext(workflow_id="wf-123")
span = trace.start_span("analyze")
logger.log_step(
    "info",
    "Starting analysis step",
    trace,
    "analyze",
    input_length=1500,
)

Debugging Failed Workflows

When a workflow fails, you need to reconstruct what happened. Build a debugging utility that pulls together metrics, logs, and state.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

class WorkflowDebugger:
    def __init__(self, store, metrics_collector, log_store):
        self.store = store
        self.metrics = metrics_collector
        self.logs = log_store

    async def investigate(self, workflow_id: str) -> dict:
        workflow = await self.store.load(workflow_id)
        logs = await self.logs.query(
            f'workflow_id="{workflow_id}"',
            limit=100,
        )

        failed_steps = [
            s for s in workflow.steps
            if s.status == "failed"
        ]

        return {
            "workflow": {
                "id": workflow.id,
                "status": workflow.status,
                "version": workflow.version,
                "started": workflow.created_at.isoformat(),
            },
            "failed_steps": [
                {
                    "name": s.name,
                    "error": s.error,
                    "attempts": s.attempts,
                    "last_attempt": s.completed_at.isoformat(),
                }
                for s in failed_steps
            ],
            "recent_logs": logs,
            "context_snapshot": workflow.context,
        }

FAQ

What is the single most important metric for agent workflows?

The step failure rate by step name. This tells you which specific step is causing problems and at what rate. Aggregate workflow failure rates hide whether the issue is systemic (all steps failing) or localized (one flaky API integration). Once you know the failing step, you can look at its error logs and retry behavior.

How do I avoid alert fatigue with AI agent monitoring?

Set alerts on rates and percentiles, not individual failures. A single failed LLM call is expected. A 10% failure rate sustained for 5 minutes is a real problem. Use the for clause in Prometheus alert rules to require sustained anomalies before firing. Also, separate informational alerts (Slack notifications) from actionable alerts (PagerDuty pages).

Should I log full LLM prompts and responses?

Log them in development and staging for debugging. In production, log truncated versions (first 200 characters) or hashes. Full prompts and responses can contain sensitive user data and consume enormous storage. Use sampling — log full content for 1% of executions — to maintain debugging capability without the storage cost.


#Observability #Monitoring #Alerting #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.