Skip to content
AI Infrastructure
AI Infrastructure11 min read0 views

Postgres LISTEN/NOTIFY for AI Realtime: Pub/Sub Without a Broker (2026)

Skip Redis and Kafka for moderate-scale AI realtime — Postgres LISTEN/NOTIFY gives you pub/sub with ACID semantics. Build an agent event bus, watch for the global commit lock, and know when to graduate.

TL;DR — Postgres LISTEN/NOTIFY is a transactional pub/sub primitive built into every Postgres install. It's perfect for AI agent realtime under ~5k commits/sec; above that, the global commit lock during NOTIFY becomes the bottleneck and you graduate to NATS or Redis Streams.

What you'll build

A Node.js + Postgres event bus that fires agent_event notifications when an agent finishes a turn, with a Server-Sent-Events bridge so browser clients see updates in <100ms — no message broker required.

Schema

CREATE TABLE agent_events (
  id BIGSERIAL PRIMARY KEY,
  tenant_id UUID NOT NULL,
  agent_id UUID NOT NULL,
  event_type TEXT NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE OR REPLACE FUNCTION notify_agent_event() RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify(
    'agent_event_' || NEW.tenant_id::text,
    json_build_object(
      'id', NEW.id,
      'agentId', NEW.agent_id,
      'type', NEW.event_type,
      'payload', NEW.payload
    )::text
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_notify_agent_event
AFTER INSERT ON agent_events
FOR EACH ROW EXECUTE FUNCTION notify_agent_event();

Architecture

flowchart LR
  AGENT[Agent run] --> INS[INSERT agent_events]
  INS --> TRG[AFTER INSERT trigger]
  TRG --> NOTIFY[pg_notify channel]
  NOTIFY --> LISTEN[Node listener]
  LISTEN --> SSE[Server-Sent Events]
  SSE --> BROWSER[React UI]

Step 1 — Listener service

import { Client } from "pg";

export async function startListener(tenantId: string, onEvent: (e: any) => void) {
  const client = new Client({ connectionString: process.env.DATABASE_URL });
  await client.connect();
  await client.query(`LISTEN agent_event_${tenantId.replace(/-/g, "_")}`);
  client.on("notification", (n) => onEvent(JSON.parse(n.payload!)));
  client.on("error", (e) => {
    console.error("listener error", e);
    setTimeout(() => startListener(tenantId, onEvent), 1000);
  });
}

Step 2 — Bridge to SSE

// app/api/agents/stream/route.ts
import { startListener } from "@/lib/notify";

export async function GET(req: Request) {
  const tenantId = await getTenantFromRequest(req);
  const stream = new ReadableStream({
    start(ctrl) {
      startListener(tenantId, (e) => {
        ctrl.enqueue(`data: ${JSON.stringify(e)}\n\n`);
      });
    },
  });
  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  });
}

Step 3 — Producer (the agent loop)

await prisma.$executeRaw`
  INSERT INTO agent_events (tenant_id, agent_id, event_type, payload)
  VALUES (${tenantId}::uuid, ${agentId}::uuid, 'turn_complete',
          ${JSON.stringify({ tokens: 142, latency: 820 })}::jsonb)
`;

The trigger fires pg_notify automatically inside the same transaction. Atomic, durable, no separate broker.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Step 4 — Browser consumer

useEffect(() => {
  const es = new EventSource("/api/agents/stream");
  es.onmessage = (m) => {
    const ev = JSON.parse(m.data);
    setFeed((f) => [ev, ...f].slice(0, 100));
  };
  return () => es.close();
}, []);

Step 5 — Watch the commit lock

SELECT query, state, wait_event_type, wait_event
FROM pg_stat_activity
WHERE wait_event = 'Notify' OR wait_event_type = 'LWLock';

If you see sustained NotifyQueueLock waits, you're saturating LISTEN/NOTIFY. Threshold: ~5k notifies/sec on commodity hardware (Recall.ai documented this).

Step 6 — Graceful fallback path

const useBroker = (process.env.NOTIFY_RATE_PER_SEC ?? 0) > 4000;
if (useBroker) {
  await nats.publish(`agent.${tenantId}`, payload);
} else {
  await prisma.$executeRaw`SELECT pg_notify(...)`;
}

Build the abstraction now so a future migration to NATS/Redis Streams is a config flip.

Pitfalls

  • Channel name length — capped at 63 bytes. Hyphens in UUIDs are fine but stay under the limit.
  • Payload size — capped at 8000 bytes. Send IDs, fetch the row separately.
  • Global commit lock — every NOTIFY takes NotifyQueueLock. Above ~5k commits/sec it serializes writes (Recall.ai outage post-mortem).
  • Connection drops swallow events — LISTEN is not durable. Pair with an outbox pattern for at-least-once.

CallSphere production note

CallSphere's call-events bus runs on LISTEN/NOTIFY for the 115+ DB tables' realtime needs — agent turns, tool calls, escalations all flow through Postgres triggers. At ~600 events/sec across 6 verticals (Healthcare/Behavioral Health on healthcare_voice Prisma, OneRoof RLS, UrackIT Supabase + ChromaDB), we're well under the commit-lock threshold. 37 agents · 90+ tools. Plans: $149/$499/$1,499, 14-day trial, 22% affiliate.

FAQ

Q: Is LISTEN/NOTIFY at-least-once or exactly-once? At-most-once across reconnects — durable inside one TCP session. Add an outbox table for guaranteed delivery.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Q: Does pgbouncer break LISTEN/NOTIFY? Yes in transaction-pooling mode. Use session pooling or a dedicated direct connection for listeners.

Q: Can multiple servers listen to the same channel? Yes — every listener gets the message (fanout).

Q: How big can payload be? 8000 bytes raw. Send a row ID and let consumers fetch.

Q: When should I switch to NATS / Redis Streams? When commit rate sustains >5k/sec, or you need durable replay/consumer groups.

Sources

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Strategy

AI Agent M&A Activity 2026: Aircall–Vogent, Meta–PlayAI, OpenAI's Six Deals

Q1 2026 saw a record acquisition wave: Aircall bought Vogent (May), Meta acquired Manus and PlayAI, OpenAI closed six deals. The voice AI consolidation phase has begun.

Agentic AI

LangGraph State-Machine Architecture: A Principal-Engineer Deep Dive (2026)

How LangGraph's StateGraph, channels, and reducers actually work — with a working multi-step agent, eval hooks at every node, and the patterns that survive production.

Agentic AI

LangGraph Checkpointers in Production: Durable, Resumable Agents with Eval Replay

Use LangGraph's checkpointer to make agents resumable across crashes and human-in-the-loop pauses, then replay any checkpoint into your eval pipeline.

Agentic AI

Multi-Agent Handoffs with the OpenAI Agents SDK: The Pattern That Actually Scales (2026)

Handoffs done right — when one agent should hand control to another, how to preserve context, and how to evaluate the handoff decision itself.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

LangGraph Supervisor Pattern: Orchestrating Multi-Agent Teams in 2026

The supervisor pattern in LangGraph for coordinating specialist agents, with full code, an eval pipeline that scores routing accuracy, and the failure modes to watch for.