Postgres LISTEN/NOTIFY for AI Realtime: Pub/Sub Without a Broker (2026)
Skip Redis and Kafka for moderate-scale AI realtime — Postgres LISTEN/NOTIFY gives you pub/sub with ACID semantics. Build an agent event bus, watch for the global commit lock, and know when to graduate.
TL;DR — Postgres LISTEN/NOTIFY is a transactional pub/sub primitive built into every Postgres install. It's perfect for AI agent realtime under ~5k commits/sec; above that, the global commit lock during NOTIFY becomes the bottleneck and you graduate to NATS or Redis Streams.
What you'll build
A Node.js + Postgres event bus that fires agent_event notifications when an agent finishes a turn, with a Server-Sent-Events bridge so browser clients see updates in <100ms — no message broker required.
Schema
CREATE TABLE agent_events (
id BIGSERIAL PRIMARY KEY,
tenant_id UUID NOT NULL,
agent_id UUID NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE OR REPLACE FUNCTION notify_agent_event() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'agent_event_' || NEW.tenant_id::text,
json_build_object(
'id', NEW.id,
'agentId', NEW.agent_id,
'type', NEW.event_type,
'payload', NEW.payload
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_notify_agent_event
AFTER INSERT ON agent_events
FOR EACH ROW EXECUTE FUNCTION notify_agent_event();
Architecture
flowchart LR
AGENT[Agent run] --> INS[INSERT agent_events]
INS --> TRG[AFTER INSERT trigger]
TRG --> NOTIFY[pg_notify channel]
NOTIFY --> LISTEN[Node listener]
LISTEN --> SSE[Server-Sent Events]
SSE --> BROWSER[React UI]
Step 1 — Listener service
import { Client } from "pg";
export async function startListener(tenantId: string, onEvent: (e: any) => void) {
const client = new Client({ connectionString: process.env.DATABASE_URL });
await client.connect();
await client.query(`LISTEN agent_event_${tenantId.replace(/-/g, "_")}`);
client.on("notification", (n) => onEvent(JSON.parse(n.payload!)));
client.on("error", (e) => {
console.error("listener error", e);
setTimeout(() => startListener(tenantId, onEvent), 1000);
});
}
Step 2 — Bridge to SSE
// app/api/agents/stream/route.ts
import { startListener } from "@/lib/notify";
export async function GET(req: Request) {
const tenantId = await getTenantFromRequest(req);
const stream = new ReadableStream({
start(ctrl) {
startListener(tenantId, (e) => {
ctrl.enqueue(`data: ${JSON.stringify(e)}\n\n`);
});
},
});
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
}
Step 3 — Producer (the agent loop)
await prisma.$executeRaw`
INSERT INTO agent_events (tenant_id, agent_id, event_type, payload)
VALUES (${tenantId}::uuid, ${agentId}::uuid, 'turn_complete',
${JSON.stringify({ tokens: 142, latency: 820 })}::jsonb)
`;
The trigger fires pg_notify automatically inside the same transaction. Atomic, durable, no separate broker.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
Step 4 — Browser consumer
useEffect(() => {
const es = new EventSource("/api/agents/stream");
es.onmessage = (m) => {
const ev = JSON.parse(m.data);
setFeed((f) => [ev, ...f].slice(0, 100));
};
return () => es.close();
}, []);
Step 5 — Watch the commit lock
SELECT query, state, wait_event_type, wait_event
FROM pg_stat_activity
WHERE wait_event = 'Notify' OR wait_event_type = 'LWLock';
If you see sustained NotifyQueueLock waits, you're saturating LISTEN/NOTIFY. Threshold: ~5k notifies/sec on commodity hardware (Recall.ai documented this).
Step 6 — Graceful fallback path
const useBroker = (process.env.NOTIFY_RATE_PER_SEC ?? 0) > 4000;
if (useBroker) {
await nats.publish(`agent.${tenantId}`, payload);
} else {
await prisma.$executeRaw`SELECT pg_notify(...)`;
}
Build the abstraction now so a future migration to NATS/Redis Streams is a config flip.
Pitfalls
- Channel name length — capped at 63 bytes. Hyphens in UUIDs are fine but stay under the limit.
- Payload size — capped at 8000 bytes. Send IDs, fetch the row separately.
- Global commit lock — every NOTIFY takes
NotifyQueueLock. Above ~5k commits/sec it serializes writes (Recall.ai outage post-mortem). - Connection drops swallow events — LISTEN is not durable. Pair with an outbox pattern for at-least-once.
CallSphere production note
CallSphere's call-events bus runs on LISTEN/NOTIFY for the 115+ DB tables' realtime needs — agent turns, tool calls, escalations all flow through Postgres triggers. At ~600 events/sec across 6 verticals (Healthcare/Behavioral Health on healthcare_voice Prisma, OneRoof RLS, UrackIT Supabase + ChromaDB), we're well under the commit-lock threshold. 37 agents · 90+ tools. Plans: $149/$499/$1,499, 14-day trial, 22% affiliate.
FAQ
Q: Is LISTEN/NOTIFY at-least-once or exactly-once? At-most-once across reconnects — durable inside one TCP session. Add an outbox table for guaranteed delivery.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Q: Does pgbouncer break LISTEN/NOTIFY? Yes in transaction-pooling mode. Use session pooling or a dedicated direct connection for listeners.
Q: Can multiple servers listen to the same channel? Yes — every listener gets the message (fanout).
Q: How big can payload be? 8000 bytes raw. Send a row ID and let consumers fetch.
Q: When should I switch to NATS / Redis Streams? When commit rate sustains >5k/sec, or you need durable replay/consumer groups.
Sources
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.