Skip to content
Learn Agentic AI
Learn Agentic AI12 min read12 views

Continuous Evaluation in Production: Real-Time Quality Monitoring for Deployed Agents

Learn how to implement continuous evaluation for production AI agents with sampling strategies, real-time quality dashboards, alerting on quality degradation, and feedback loops that drive iterative improvement.

Why Pre-Deployment Testing Is Not Enough

Your evaluation dataset covers the scenarios you anticipated. Production covers everything else. Users phrase things in ways you never imagined. Edge cases compound in sequences you never tested. Upstream model providers push silent updates that shift behavior. A model that passed your evaluation suite last week can degrade this week without any change on your end.

Continuous evaluation in production bridges the gap between controlled testing and real-world performance. It samples live conversations, scores them automatically, and alerts you before quality drops become customer complaints.

Designing a Sampling Strategy

You cannot evaluate every conversation in production — the cost of LLM-as-judge scoring would exceed the cost of the agent itself. Strategic sampling gives you statistical confidence at a fraction of the cost.

flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK<br/>GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces<br/>Tempo or Honeycomb")]
        MET[("Metrics<br/>Prometheus")]
        LOG[("Logs<br/>Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
import random
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class SamplingConfig:
    base_rate: float = 0.05  # 5% of conversations
    boost_rate: float = 0.25  # 25% for flagged patterns
    boost_triggers: list[str] = field(
        default_factory=lambda: [
            "user_thumbs_down",
            "escalation_requested",
            "high_token_count",
            "tool_error",
        ]
    )
    min_daily_samples: int = 100
    max_daily_samples: int = 5000

class ProductionSampler:
    def __init__(self, config: SamplingConfig):
        self.config = config
        self.daily_count = 0
        self.last_reset = datetime.utcnow().date()

    def _reset_if_new_day(self):
        today = datetime.utcnow().date()
        if today > self.last_reset:
            self.daily_count = 0
            self.last_reset = today

    def should_sample(
        self, conversation_id: str, signals: dict = None
    ) -> bool:
        self._reset_if_new_day()

        if self.daily_count >= self.config.max_daily_samples:
            return False

        # Deterministic hash for reproducibility
        hash_val = int(
            hashlib.md5(
                conversation_id.encode()
            ).hexdigest()[:8],
            16,
        )
        threshold = hash_val / 0xFFFFFFFF

        signals = signals or {}
        has_trigger = any(
            signals.get(t, False)
            for t in self.config.boost_triggers
        )
        rate = (
            self.config.boost_rate
            if has_trigger
            else self.config.base_rate
        )

        # Boost if below minimum daily target
        hours_elapsed = max(
            1, datetime.utcnow().hour
        )
        expected = (
            self.config.min_daily_samples
            * hours_elapsed / 24
        )
        if self.daily_count < expected * 0.5:
            rate = min(rate * 2, 0.5)

        if threshold < rate:
            self.daily_count += 1
            return True
        return False

The deterministic hash ensures the same conversation always gets the same sampling decision, which matters for debugging. Boost sampling on negative signals — conversations where users express dissatisfaction, where escalations happen, or where tools error out. These are exactly the conversations you need to evaluate most.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Real-Time Quality Scoring Pipeline

Build an asynchronous pipeline that evaluates sampled conversations without blocking the user experience.

import asyncio
from collections import deque

@dataclass
class QualityScore:
    conversation_id: str
    timestamp: str
    scores: dict  # e.g., {"coherence": 4, "task_completion": 0.8}
    flags: list[str] = field(default_factory=list)

class OnlineEvaluationPipeline:
    def __init__(self, scoring_functions: list, queue_size: int = 1000):
        self.scorers = scoring_functions
        self.queue: asyncio.Queue = asyncio.Queue(
            maxsize=queue_size
        )
        self.results: deque = deque(maxlen=10000)
        self._running = False

    async def submit(self, conversation: dict):
        try:
            self.queue.put_nowait(conversation)
        except asyncio.QueueFull:
            pass  # Drop if pipeline is backed up

    async def _process(self):
        while self._running:
            try:
                conversation = await asyncio.wait_for(
                    self.queue.get(), timeout=5.0
                )
            except asyncio.TimeoutError:
                continue

            scores = {}
            flags = []
            for scorer in self.scorers:
                try:
                    result = await scorer(conversation)
                    scores.update(result.get("scores", {}))
                    flags.extend(result.get("flags", []))
                except Exception as e:
                    flags.append(f"scorer_error:{e}")

            quality_score = QualityScore(
                conversation_id=conversation["id"],
                timestamp=datetime.utcnow().isoformat(),
                scores=scores,
                flags=flags,
            )
            self.results.append(quality_score)
            self.queue.task_done()

    async def start(self, workers: int = 3):
        self._running = True
        tasks = [
            asyncio.create_task(self._process())
            for _ in range(workers)
        ]
        return tasks

    async def stop(self):
        self._running = False

Multiple workers process the queue in parallel. If the queue fills up, new submissions are dropped rather than blocking the agent — monitoring should never degrade the user experience.

Building Quality Dashboards

Aggregate scores into time-windowed views that reveal trends and anomalies.

from collections import defaultdict
from typing import Sequence

class QualityDashboard:
    def __init__(self, window_minutes: int = 60):
        self.window_minutes = window_minutes
        self.scores: list[QualityScore] = []

    def add_score(self, score: QualityScore):
        self.scores.append(score)

    def _recent_scores(self) -> list[QualityScore]:
        cutoff = (
            datetime.utcnow()
            - timedelta(minutes=self.window_minutes)
        )
        cutoff_str = cutoff.isoformat()
        return [
            s for s in self.scores
            if s.timestamp >= cutoff_str
        ]

    def current_metrics(self) -> dict:
        recent = self._recent_scores()
        if not recent:
            return {"status": "no_data"}

        metric_values = defaultdict(list)
        all_flags = []
        for score in recent:
            for key, value in score.scores.items():
                if isinstance(value, (int, float)):
                    metric_values[key].append(value)
            all_flags.extend(score.flags)

        metrics = {}
        for key, values in metric_values.items():
            metrics[key] = {
                "mean": round(sum(values) / len(values), 3),
                "min": round(min(values), 3),
                "max": round(max(values), 3),
                "count": len(values),
            }

        # Flag frequency
        flag_counts = defaultdict(int)
        for flag in all_flags:
            flag_counts[flag] += 1

        return {
            "window_minutes": self.window_minutes,
            "conversations_evaluated": len(recent),
            "metrics": metrics,
            "top_flags": dict(
                sorted(
                    flag_counts.items(),
                    key=lambda x: -x[1],
                )[:10]
            ),
        }

    def compare_windows(
        self, current_minutes: int = 60, baseline_minutes: int = 1440
    ) -> dict:
        now = datetime.utcnow()
        current_cutoff = (
            now - timedelta(minutes=current_minutes)
        ).isoformat()
        baseline_cutoff = (
            now - timedelta(minutes=baseline_minutes)
        ).isoformat()

        current = [
            s for s in self.scores
            if s.timestamp >= current_cutoff
        ]
        baseline = [
            s for s in self.scores
            if baseline_cutoff <= s.timestamp < current_cutoff
        ]

        def avg_metric(scores, key):
            vals = [
                s.scores.get(key, 0)
                for s in scores
                if isinstance(s.scores.get(key), (int, float))
            ]
            return sum(vals) / len(vals) if vals else 0

        # Compare common metrics
        all_keys = set()
        for s in current + baseline:
            all_keys.update(s.scores.keys())

        comparison = {}
        for key in all_keys:
            curr_avg = avg_metric(current, key)
            base_avg = avg_metric(baseline, key)
            delta = curr_avg - base_avg
            comparison[key] = {
                "current": round(curr_avg, 3),
                "baseline": round(base_avg, 3),
                "delta": round(delta, 3),
                "degraded": delta < -0.1,
            }

        return comparison

The compare_windows method is your early warning system. It compares the last hour against the last 24 hours. When a metric's delta turns significantly negative, something changed — a model update, a traffic pattern shift, or a bug.

Alerting on Quality Degradation

Convert dashboard data into actionable alerts.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

@dataclass
class AlertRule:
    metric: str
    threshold: float
    comparison: str  # "below", "above"
    severity: str  # "warning", "critical"
    message_template: str

class QualityAlertManager:
    def __init__(self):
        self.rules: list[AlertRule] = []
        self.active_alerts: list[dict] = []

    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)

    def evaluate(self, metrics: dict) -> list[dict]:
        triggered = []
        for rule in self.rules:
            metric_data = metrics.get("metrics", {}).get(
                rule.metric, {}
            )
            value = metric_data.get("mean")
            if value is None:
                continue

            fire = (
                (rule.comparison == "below" and value < rule.threshold)
                or (rule.comparison == "above" and value > rule.threshold)
            )
            if fire:
                alert = {
                    "metric": rule.metric,
                    "value": value,
                    "threshold": rule.threshold,
                    "severity": rule.severity,
                    "message": rule.message_template.format(
                        metric=rule.metric,
                        value=value,
                        threshold=rule.threshold,
                    ),
                    "timestamp": datetime.utcnow().isoformat(),
                }
                triggered.append(alert)

        self.active_alerts = triggered
        return triggered

# Configure alerts
alert_mgr = QualityAlertManager()
alert_mgr.add_rule(AlertRule(
    metric="task_completion",
    threshold=0.7,
    comparison="below",
    severity="critical",
    message_template="Task completion dropped to {value:.1%}, below {threshold:.1%} threshold",
))
alert_mgr.add_rule(AlertRule(
    metric="coherence",
    threshold=3.0,
    comparison="below",
    severity="warning",
    message_template="Coherence score at {value:.1f}, below {threshold:.1f} minimum",
))

Closing the Feedback Loop

The final piece is feeding production evaluation results back into your offline evaluation datasets. Conversations that score poorly in production become new test cases. Patterns that trigger alerts become new red team samples. This creates a virtuous cycle where your evaluation dataset grows smarter over time, reflecting the actual failure modes of your deployed agent rather than the failures you imagined during development.

FAQ

How much does continuous production evaluation cost?

At a 5 percent sampling rate with LLM-as-judge scoring, expect to spend 2 to 5 percent of your agent's total LLM cost on evaluation. For a system spending 10,000 dollars a month on agent inference, that is 200 to 500 dollars for continuous monitoring. Deterministic checks are essentially free, so maximize those and use LLM judges selectively for quality dimensions that require language understanding.

How do I avoid alert fatigue from too many false positives?

Start with conservative thresholds that only fire on genuine quality drops. Require sustained degradation — the metric must be below threshold for 15 minutes, not just a single sample. Group related alerts together so a single root cause does not generate five separate alerts. Review and tune thresholds monthly based on actual incident correlation.

Should I evaluate the same conversation multiple times with different judges?

For production monitoring, one evaluation pass is sufficient — you need speed and cost efficiency. For conversations flagged as potential quality issues, run a second evaluation with a different judge model to confirm. This two-tier approach keeps costs low while reducing false positives on the cases that might trigger engineering action.


#ProductionMonitoring #ContinuousEvaluation #Observability #Alerting #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Infrastructure

Monitoring WebSocket Health: Heartbeats and Prometheus in 2026

How to actually observe a WebSocket fleet: ping/pong heartbeats, Prometheus metrics that matter, dead-man switches, and the alerts that fire before customers notice.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Continuous Evaluation: Wiring LangSmith into Your CI/CD for Agent Releases

Run offline evals as a CI gate. GitHub Actions wiring, threshold gates, LangSmith Experiments, and how to block merges on agent regression — with real YAML.

Agentic AI

The Agent Evaluation Stack in 2026: From Trace to Eval Score

How the modern agent eval stack actually flows: instrument, trace, dataset, evaluator, score, CI gate. The full pipeline that keeps agents from regressing.

AI Voice Agents

MOS Call Quality Scoring for AI Voice Operations in 2026: Beyond 4.2

MOS 4.3+ is the band where AI voice feels human. Drop below 3.6 and conversations break. Here is how to measure, improve, and alert on MOS in production AI voice using G.711, Opus, and the underlying packet loss / jitter / latency math.

AI Engineering

Arize Phoenix: Open-Source LLM Tracing in 2026 Reviewed Honestly

Arize Phoenix is the open-source LLM observability tool that grew up significantly in 2026. Tracing, evals, and the OTel-native approach that makes Phoenix portable.