Postgres Logical Replication for AI ETL: Stream OLTP to Your Feature Store (2026)
Stop running brittle nightly batches. Postgres logical replication gives you sub-second CDC into a warehouse, feature store, or vector index. A working publication, slot management, and a Supabase ETL Rust example.
TL;DR — Logical replication turns Postgres into a CDC source. AI teams in 2026 use it to stream OLTP changes into pgvector, Iceberg, or feature stores in <2 seconds — without nightly dumps and without touching production.
What you'll build
A publication on the OLTP primary that streams calls, messages, and agent_runs to a downstream Postgres + pgvector instance, where a transformer enriches each row with embeddings before insert. Lag stays under 2 seconds at 800 writes/sec.
Schema
-- On the primary (OLTP)
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();
CREATE PUBLICATION ai_pipeline FOR TABLE calls, messages, agent_runs
WITH (publish = 'insert,update');
-- On the subscriber (analytics + vectors)
CREATE SUBSCRIPTION ai_pipeline_sub
CONNECTION 'host=primary user=replicator dbname=app password=...'
PUBLICATION ai_pipeline
WITH (copy_data = true, streaming = on);
Architecture
flowchart LR
APP[App writes] --> OLTP[(Primary Postgres)]
OLTP --> WAL[WAL stream]
WAL --> SLOT[Replication slot]
SLOT --> SUB[Subscriber]
SUB --> XFORM[Transformer<br/>+ embed]
XFORM --> ANALYTICS[(Analytics + pgvector)]
ANALYTICS --> AGENTS[AI agents]
Step 1 — Prepare the primary
# postgresql.conf
wal_level = logical
max_replication_slots = 20
max_wal_senders = 20
# pg_hba.conf
host replication replicator 10.0.0.0/8 scram-sha-256
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD '...';
GRANT SELECT ON calls, messages, agent_runs TO replicator;
Step 2 — Create the publication
CREATE PUBLICATION ai_pipeline FOR TABLE calls, messages, agent_runs
WITH (publish = 'insert,update,delete');
For column-level filtering (Postgres 15+):
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
CREATE PUBLICATION ai_pipeline FOR TABLE
calls (id, tenant_id, transcript, ended_at)
WITH (publish_via_partition_root = true);
Step 3 — Create the subscriber
CREATE SUBSCRIPTION ai_pipeline_sub
CONNECTION 'host=primary.internal user=replicator dbname=app password=...'
PUBLICATION ai_pipeline
WITH (copy_data = true, streaming = on, two_phase = on);
Step 4 — Transform and embed downstream
# Run on the subscriber, polling the new rows
import psycopg, openai, os
oai = openai.OpenAI()
sub = psycopg.connect(os.environ["SUB_DSN"])
def enrich():
with sub.cursor() as cur:
cur.execute("""
SELECT id, transcript FROM calls
WHERE embedding IS NULL AND transcript IS NOT NULL
LIMIT 100
""")
rows = cur.fetchall()
for cid, txt in rows:
v = oai.embeddings.create(
model="text-embedding-3-small", input=txt[:8000]
).data[0].embedding
with sub.cursor() as cur:
cur.execute(
"UPDATE calls SET embedding = %s::vector WHERE id = %s",
(v, cid),
)
sub.commit()
Run on a 30-second cron loop or trigger via LISTEN/NOTIFY for sub-second latency.
Step 5 — Monitor lag
-- On the primary
SELECT slot_name, active, restart_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_bytes
FROM pg_replication_slots;
-- On the subscriber
SELECT subname, received_lsn, latest_end_lsn,
latest_end_time, last_msg_receipt_time
FROM pg_stat_subscription;
Step 6 — Use Supabase ETL for fan-out
For non-Postgres destinations (BigQuery, Iceberg, S3 Parquet) drop in Supabase ETL — Rust building blocks on top of pgoutput that handle backfill, restart, and parallel apply.
cargo install supabase-etl
supabase-etl run --source ai_pipeline --sink bigquery://prod
Pitfalls
- Replication slots leak WAL — a paused subscriber keeps WAL on disk forever. Monitor
pg_replication_slots.activeand disk usage daily. - No DDL replication — schema changes must be applied to the subscriber first.
- Large transactions stall — use
streaming = on(Postgres 14+) to apply in chunks. - Forgetting REPLICA IDENTITY — UPDATE/DELETE on tables without primary keys silently breaks. Set
REPLICA IDENTITY FULLfor those.
CallSphere production note
CallSphere streams OLTP changes from call_logs and messages tables into a vector-enriched analytics replica via logical replication. 115+ DB tables stay write-fast on the primary; embeddings, summaries, and analytics aggregates are computed on the subscriber. Healthcare uses HIPAA-isolated healthcare_voice Prisma + dedicated subscriber; OneRoof preserves RLS via per-tenant publications; UrackIT mirrors to Supabase + ChromaDB. 37 agents · 90+ tools · 6 verticals. Plans: $149/$499/$1,499 — 14-day trial, 22% affiliate.
FAQ
Q: Logical replication or trigger-based CDC? Logical replication for >50 writes/sec; triggers add too much per-row overhead at scale.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Q: Can I replicate part of a table? Yes — column lists (PG15+) and row filters (PG15+) are first-class.
Q: Does pgbouncer affect replication? No — replication uses its own protocol on a direct connection.
Q: How do I bootstrap a new subscriber?
copy_data = true does the initial sync; for huge tables consider pg_dump + start subscription with copy_data = false.
Q: When should I use Debezium instead? Only if your sink is Kafka and you already run Kafka. Otherwise logical rep + Supabase ETL is simpler.
Sources
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.