Skip to content
Learn Agentic AI
Learn Agentic AI14 min read26 views

Building an Agent Orchestration Dashboard: Visualizing Workflow Status and Performance

Learn how to build a real-time orchestration dashboard for AI agent workflows. Covers UI components, status tracking, timeline views, error drill-down, and the backend API that powers it all.

Why Build a Custom Dashboard

Off-the-shelf orchestration platforms include their own dashboards, but custom dashboards serve a different purpose. They surface domain-specific insights — agent quality scores, LLM cost breakdowns, model performance comparisons — that generic workflow UIs do not provide.

A well-designed orchestration dashboard answers three questions at a glance: What is running right now? What failed recently and why? How much is this costing?

Dashboard API with FastAPI

The backend API provides endpoints for the dashboard frontend to consume. Start with the data models and core endpoints.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway<br/>auth plus rate limit"]
    APP["FastAPI app<br/>handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer<br/>business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
from fastapi import FastAPI, Query
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional

app = FastAPI(title="Agent Orchestration Dashboard API")

class WorkflowSummary(BaseModel):
    id: str
    name: str
    status: str
    started_at: datetime
    duration_seconds: float | None
    step_count: int
    current_step: str | None
    error: str | None
    total_tokens: int
    cost_usd: float

class DashboardOverview(BaseModel):
    active_workflows: int
    completed_today: int
    failed_today: int
    success_rate: float
    total_cost_today_usd: float
    total_tokens_today: int
    avg_duration_seconds: float

@app.get("/api/dashboard/overview")
async def get_overview(
    hours: int = Query(default=24, ge=1, le=168),
) -> DashboardOverview:
    """Get high-level dashboard statistics."""
    since = datetime.utcnow() - timedelta(hours=hours)

    active = await workflow_store.count(status="running")
    completed = await workflow_store.count(
        status="completed", since=since
    )
    failed = await workflow_store.count(
        status="failed", since=since
    )
    total = completed + failed
    success_rate = completed / total if total > 0 else 1.0

    cost_data = await metrics_store.sum_cost(since=since)
    token_data = await metrics_store.sum_tokens(since=since)
    avg_duration = await metrics_store.avg_duration(since=since)

    return DashboardOverview(
        active_workflows=active,
        completed_today=completed,
        failed_today=failed,
        success_rate=round(success_rate, 4),
        total_cost_today_usd=round(cost_data, 2),
        total_tokens_today=token_data,
        avg_duration_seconds=round(avg_duration, 2),
    )

Workflow List and Filtering

class WorkflowFilter(BaseModel):
    status: Optional[str] = None
    name: Optional[str] = None
    since: Optional[datetime] = None
    until: Optional[datetime] = None
    min_cost_usd: Optional[float] = None
    has_errors: Optional[bool] = None

@app.get("/api/dashboard/workflows")
async def list_workflows(
    status: Optional[str] = None,
    name: Optional[str] = None,
    hours: int = Query(default=24, ge=1, le=168),
    limit: int = Query(default=50, ge=1, le=200),
    offset: int = Query(default=0, ge=0),
) -> dict:
    """List workflows with filtering and pagination."""
    since = datetime.utcnow() - timedelta(hours=hours)

    workflows = await workflow_store.query(
        status=status,
        name=name,
        since=since,
        limit=limit,
        offset=offset,
    )

    total = await workflow_store.count(
        status=status, name=name, since=since
    )

    return {
        "workflows": [
            WorkflowSummary(
                id=wf.id,
                name=wf.name,
                status=wf.status,
                started_at=wf.created_at,
                duration_seconds=wf.duration_seconds,
                step_count=len(wf.steps),
                current_step=_get_current_step(wf),
                error=_get_last_error(wf),
                total_tokens=wf.metrics.total_tokens,
                cost_usd=wf.metrics.total_cost_usd,
            )
            for wf in workflows
        ],
        "total": total,
        "limit": limit,
        "offset": offset,
    }

def _get_current_step(wf) -> str | None:
    running = [s for s in wf.steps if s.status == "running"]
    return running[0].name if running else None

def _get_last_error(wf) -> str | None:
    failed = [s for s in wf.steps if s.status == "failed"]
    return failed[-1].error if failed else None

Timeline View API

The timeline view shows each step's duration and status as a horizontal bar chart, making it easy to spot bottlenecks.

class TimelineStep(BaseModel):
    name: str
    status: str
    started_at: datetime | None
    completed_at: datetime | None
    duration_ms: float | None
    attempts: int
    error: str | None
    llm_calls: int
    tokens_used: int

@app.get("/api/dashboard/workflows/{workflow_id}/timeline")
async def get_workflow_timeline(workflow_id: str) -> dict:
    """Get detailed timeline for a specific workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    steps = []
    for step in wf.steps:
        llm_calls_for_step = [
            call for call in wf.metrics.llm_calls
            if call.get("step_name") == step.name
        ]
        steps.append(TimelineStep(
            name=step.name,
            status=step.status,
            started_at=step.started_at,
            completed_at=step.completed_at,
            duration_ms=(
                (step.completed_at - step.started_at).total_seconds()
                * 1000
                if step.started_at and step.completed_at
                else None
            ),
            attempts=step.attempts,
            error=step.error,
            llm_calls=len(llm_calls_for_step),
            tokens_used=sum(
                c.get("input_tokens", 0) + c.get("output_tokens", 0)
                for c in llm_calls_for_step
            ),
        ))

    return {
        "workflow_id": wf.id,
        "workflow_name": wf.name,
        "total_duration_ms": (
            (wf.updated_at - wf.created_at).total_seconds() * 1000
        ),
        "steps": steps,
    }

Error Drill-Down API

When a workflow fails, the dashboard needs to show exactly what went wrong at each level.

class ErrorDetail(BaseModel):
    workflow_id: str
    step_name: str
    error_type: str
    error_message: str
    stack_trace: str | None
    attempt_number: int
    timestamp: datetime
    context_snapshot: dict

@app.get("/api/dashboard/workflows/{workflow_id}/errors")
async def get_workflow_errors(workflow_id: str) -> list[ErrorDetail]:
    """Get detailed error information for a workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    errors = []
    for step in wf.steps:
        if step.error:
            errors.append(ErrorDetail(
                workflow_id=wf.id,
                step_name=step.name,
                error_type=step.error_type or "Unknown",
                error_message=step.error,
                stack_trace=step.stack_trace,
                attempt_number=step.attempts,
                timestamp=step.completed_at or wf.updated_at,
                context_snapshot={
                    k: str(v)[:200]  # Truncate large values
                    for k, v in wf.context.items()
                    if not k.startswith("_")
                },
            ))

    return errors

Cost Breakdown API

class CostBreakdown(BaseModel):
    model: str
    call_count: int
    total_tokens: int
    input_tokens: int
    output_tokens: int
    total_cost_usd: float

@app.get("/api/dashboard/costs")
async def get_cost_breakdown(
    hours: int = Query(default=24, ge=1, le=168),
    group_by: str = Query(default="model", enum=["model", "workflow", "step"]),
) -> dict:
    """Get cost breakdown by model, workflow, or step."""
    since = datetime.utcnow() - timedelta(hours=hours)

    raw_data = await metrics_store.get_llm_costs(
        since=since, group_by=group_by
    )

    breakdowns = [
        CostBreakdown(
            model=row["group_key"],
            call_count=row["call_count"],
            total_tokens=row["total_tokens"],
            input_tokens=row["input_tokens"],
            output_tokens=row["output_tokens"],
            total_cost_usd=round(row["total_cost"], 4),
        )
        for row in raw_data
    ]

    return {
        "period_hours": hours,
        "breakdowns": breakdowns,
        "total_cost_usd": round(
            sum(b.total_cost_usd for b in breakdowns), 2
        ),
    }

Real-Time Updates with WebSocket

For live dashboard updates, stream workflow state changes over WebSocket.

from fastapi import WebSocket, WebSocketDisconnect
import json

class DashboardBroadcaster:
    def __init__(self):
        self.connections: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.remove(ws)

    async def broadcast(self, event: dict):
        dead = []
        for ws in self.connections:
            try:
                await ws.send_json(event)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.connections.remove(ws)

broadcaster = DashboardBroadcaster()

@app.websocket("/ws/dashboard")
async def dashboard_ws(ws: WebSocket):
    await broadcaster.connect(ws)
    try:
        while True:
            await ws.receive_text()  # Keep connection alive
    except WebSocketDisconnect:
        broadcaster.disconnect(ws)

# Call this from your orchestrator when state changes
async def on_workflow_event(event_type: str, workflow_id: str, data: dict):
    await broadcaster.broadcast({
        "type": event_type,
        "workflow_id": workflow_id,
        "timestamp": datetime.utcnow().isoformat(),
        "data": data,
    })

FAQ

What refresh rate should the dashboard use?

For the overview panel, poll every 5-10 seconds. For individual workflow detail views, use WebSocket connections for real-time updates. Avoid polling faster than every 2 seconds — it creates unnecessary database load and the human eye cannot perceive changes faster than that anyway.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Should I build the dashboard frontend in React or use Grafana?

Use Grafana for metrics-heavy dashboards (latency percentiles, throughput charts, cost trends) since it integrates natively with Prometheus and requires no frontend code. Build a custom React or Next.js dashboard for workflow-specific views like timelines, step drill-downs, and action buttons (retry, cancel, pause) that Grafana cannot provide. Many teams use both.

How do I handle dashboard performance with thousands of workflows?

Implement server-side pagination, filtering, and aggregation. Never load all workflows into the frontend. Use database indexes on status, created_at, and workflow_name columns. For the overview metrics, pre-compute aggregates on a schedule rather than computing them on every request. A materialized view or Redis cache updated every 30 seconds works well for dashboard summary statistics.


#Dashboard #Visualization #AgentOrchestration #FastAPI #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

LangGraph Supervisor Pattern: Orchestrating Multi-Agent Teams in 2026

The supervisor pattern in LangGraph for coordinating specialist agents, with full code, an eval pipeline that scores routing accuracy, and the failure modes to watch for.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Multi-Agent Handoffs with the OpenAI Agents SDK: The Pattern That Actually Scales (2026)

Handoffs done right — when one agent should hand control to another, how to preserve context, and how to evaluate the handoff decision itself.

AI Voice Agents

Call Sentiment Time-Series Dashboards for Voice AI in 2026

Sentiment is not a single number per call - it is a curve. The shape (started positive, dropped at minute 4, recovered) tells you what your AI did wrong. Here is the per-utterance sentiment pipeline and the dashboards we ship by vertical.

Agentic AI

Smolagents: Hugging Face's Code-First Agent Framework Reviewed

Smolagents lets agents write Python instead of JSON. Why code-as-action reduces tool errors and where the security trade-offs are for production deployments.

AI Strategy

Enterprise CIO Guide: LangGraph 1.0 — The Agent Orchestration Library Hits Stable

Enterprise CIO Guide perspective on LangGraph's 1.0 release stabilizes the API and adds the production primitives that early adopters had been hand-rolling.