Skip to content
AI Infrastructure
AI Infrastructure12 min read0 views

How to Build a Voice Agent Post-Call Analytics Pipeline

Score sentiment from –1.0 to 1.0, lead intent from 0 to 100, and extract structured entities from every call. Async pipeline with NATS, gpt-4o-mini, and a Postgres analytics table.

TL;DR — Don't compute analytics inline. Push a call.completed event to a queue, run a worker that calls gpt-4o-mini with structured JSON output, and write back sentiment + lead score + entities to a dedicated table. Latency stays low; insights show up within 60s.

What you'll build

An async post-call analytics pipeline: when a call ends, an event hits NATS; a Python worker consumes it, fetches the transcript from Postgres, scores sentiment (–1.0 to 1.0), lead intent (0–100), and extracts entities, then writes results back. A dashboard query gives you "top leads in last 24h" in <50ms.

Prerequisites

  1. Working voice agent persisting transcripts (post 8).
  2. NATS server (or Redis Streams / RabbitMQ — pattern is the same).
  3. pip install nats-py openai psycopg[binary] or Node equivalents.
  4. OPENAI_API_KEY exported.
  5. ~10 minutes to get NATS up via Docker.

Architecture

flowchart LR
  CALL[Call ends] -->|publish| NATS
  NATS --> W[Analytics worker]
  W --> DB[(Postgres turns)]
  W --> OAI[gpt-4o-mini JSON]
  OAI --> W
  W --> AN[(analytics table)]
  AN --> DASH[Dashboard]

Step 1 — Run NATS

```bash docker run -d --name nats -p 4222:4222 nats:2.10 -js ```

Step 2 — Publish a call.completed event

In your voice bridge, on call end:

```ts import { connect } from "nats"; const nc = await connect({ servers: "nats://localhost:4222" }); const sc = StringCodec(); nc.publish("call.completed", sc.encode(JSON.stringify({ callId, endedAt: new Date() }))); ```

Step 3 — Worker scaffold (Python)

```python

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

worker.py

import asyncio, json, os import nats import psycopg from openai import OpenAI

oai = OpenAI() PG = os.environ["DATABASE_URL"]

async def main(): nc = await nats.connect("nats://localhost:4222") sub = await nc.subscribe("call.completed", queue="analytics")

async for msg in sub.messages:
    try:
        await analyze(json.loads(msg.data.decode()))
    except Exception as e:
        print("err:", e)

```

Step 4 — Analyze a single call

```python SYSTEM = """You score voice calls. Return STRICT JSON: { "sentiment": float in [-1.0, 1.0], "lead_score": int in [0, 100], "intent": one of ["info","booking","complaint","sales","support"], "entities": {"name":str|null, "email":str|null, "topic":str|null}, "summary": str (max 240 chars), "next_action": str }"""

async def analyze(evt): call_id = evt["callId"] with psycopg.connect(PG) as conn, conn.cursor() as cur: cur.execute("SELECT role, text FROM "Turn" WHERE "callId"=%s ORDER BY "startedAt"", (call_id,)) turns = cur.fetchall()

transcript = "\n".join(f"{r}: {t}" for r, t in turns)

r = oai.chat.completions.create(
    model="gpt-4o-mini",
    response_format={"type": "json_object"},
    messages=[
        {"role": "system", "content": SYSTEM},
        {"role": "user", "content": transcript[:12000]},
    ],
)
a = json.loads(r.choices[0].message.content)

with psycopg.connect(PG) as conn, conn.cursor() as cur:
    cur.execute("""
        INSERT INTO "Analytics"
          ("callId", sentiment, "leadScore", intent, entities, "computedAt")
        VALUES (%s, %s, %s, %s, %s, now())
        ON CONFLICT ("callId") DO UPDATE SET
          sentiment = EXCLUDED.sentiment,
          "leadScore" = EXCLUDED."leadScore",
          intent = EXCLUDED.intent,
          entities = EXCLUDED.entities,
          "computedAt" = now()
    """, (call_id, a["sentiment"], a["lead_score"], a["intent"], json.dumps(a["entities"])))
    conn.commit()

```

Step 5 — Add an entity-of-interest webhook

Hot leads (score ≥ 80, intent = sales) deserve a Slack ping. Trigger a follow-up after writing analytics:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

```python if a["lead_score"] >= 80 and a["intent"] == "sales": await nc.publish("lead.hot", json.dumps({"callId": call_id, **a}).encode()) ```

Step 6 — Dashboard queries

```sql -- Top leads in last 24h SELECT c.id, c."fromNumber", a."leadScore", a.intent, a.entities FROM "Call" c JOIN "Analytics" a ON a."callId" = c.id WHERE c."endedAt" > now() - interval '24 hours' ORDER BY a."leadScore" DESC LIMIT 20;

-- Sentiment trend per agent (7 days) SELECT c."agentId", date_trunc('day', c."endedAt") d, AVG(a.sentiment) avg_sent FROM "Call" c JOIN "Analytics" a ON a."callId" = c.id WHERE c."endedAt" > now() - interval '7 days' GROUP BY 1, 2 ORDER BY 1, 2; ```

Step 7 — Backfill

```bash psql -c "SELECT id FROM "Call" WHERE id NOT IN (SELECT "callId" FROM "Analytics")"
| xargs -I% nats pub call.completed '{"callId":"%"}' ```

Common pitfalls

  • Inline analytics: blocks call end, adds 800–1500ms. Always async.
  • No idempotency: re-running a worker double-writes without ON CONFLICT.
  • Transcript too long: cap at ~12k chars or summarize first; gpt-4o-mini handles it cheaply.
  • response_format: text: model drifts, dashboard breaks. Always json_object with strict schema.

How CallSphere does this in production

CallSphere's analytics worker runs on every call across all 6 verticals — Healthcare, Real Estate (OneRoof), Salon, Forex, Hospitality, Behavioral Health. Sentiment –1.0 to 1.0, lead score 0–100, hot-lead webhook to the GTM CRM table — same pipeline as above, plus a nightly aggregation that powers the admin dashboard. Worker fan-out via NATS, 4 replicas, p95 analytics latency ~12s post-call. Try the dashboard on a 14-day trial.

FAQ

Why gpt-4o-mini and not 4o? 50x cheaper, accurate enough for sentiment and intent. Reserve 4o for nuanced summaries.

NATS vs Redis Streams? NATS JetStream is simpler with at-least-once delivery. Redis is fine if you already use it.

How do I evaluate sentiment accuracy? Hand-label 200 calls, compute MAE — target <0.15 across the –1 to 1 range.

Can I do this in real-time during the call? Yes — subscribe to the running transcript and update analytics every 30s. More cost, marginal value.

Sources

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.