Skip to content
Learn Agentic AI
Learn Agentic AI11 min read8 views

WebSocket Architecture for AI Applications: Persistent Connections for Real-Time Agents

Learn how to design WebSocket-based architectures for AI agents, covering connection lifecycle management, protocol framing, heartbeat mechanisms, and automatic reconnection strategies for production reliability.

Why WebSockets Matter for AI Agents

HTTP request-response cycles work well for one-shot AI queries, but real-time AI agents need persistent, bidirectional communication. When an agent streams partial results, receives live tool outputs, or coordinates with other agents, opening a new TCP connection for every message adds unacceptable overhead. WebSockets solve this by upgrading an initial HTTP connection into a long-lived, full-duplex channel.

The WebSocket protocol (RFC 6455) begins with an HTTP upgrade handshake. Once the server responds with a 101 status code, both sides can send frames at any time without re-establishing a connection. This eliminates the repeated TLS handshake cost and HTTP header overhead that would otherwise dominate latency-sensitive AI interactions.

Connection Lifecycle Design

A robust WebSocket architecture for AI agents follows four phases: handshake, authentication, active session, and graceful shutdown.

sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from datetime import datetime, timezone

app = FastAPI()

class AgentSession:
    def __init__(self, ws: WebSocket, user_id: str):
        self.ws = ws
        self.user_id = user_id
        self.connected_at = datetime.now(timezone.utc)
        self.last_heartbeat = self.connected_at

    async def send_event(self, event_type: str, payload: dict):
        message = {
            "type": event_type,
            "payload": payload,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        await self.ws.send_json(message)

sessions: dict[str, AgentSession] = {}

@app.websocket("/ws/agent/{user_id}")
async def agent_endpoint(ws: WebSocket, user_id: str):
    await ws.accept()

    # Authentication phase
    auth_msg = await asyncio.wait_for(ws.receive_json(), timeout=10.0)
    if auth_msg.get("type") != "auth" or not verify_token(auth_msg.get("token")):
        await ws.close(code=4001, reason="Authentication failed")
        return

    session = AgentSession(ws, user_id)
    sessions[user_id] = session
    await session.send_event("connected", {"session_id": user_id})

    try:
        while True:
            data = await ws.receive_json()
            await handle_agent_message(session, data)
    except WebSocketDisconnect:
        pass
    finally:
        sessions.pop(user_id, None)

The handshake phase accepts the raw connection. Authentication happens immediately after — the client must send a token within 10 seconds or get disconnected. The active session loop processes messages until the client disconnects, and the finally block guarantees cleanup.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Protocol Design with Typed Messages

Define a clear message protocol so both client and server know what to expect. Every message should include a type field, a payload, and an optional request_id for correlating responses.

from enum import Enum
from pydantic import BaseModel
from typing import Any, Optional

class ClientMessageType(str, Enum):
    AUTH = "auth"
    QUERY = "query"
    CANCEL = "cancel"
    HEARTBEAT = "ping"

class ServerMessageType(str, Enum):
    CONNECTED = "connected"
    AGENT_TOKEN = "agent_token"
    AGENT_DONE = "agent_done"
    ERROR = "error"
    HEARTBEAT_ACK = "pong"

class ClientMessage(BaseModel):
    type: ClientMessageType
    payload: dict[str, Any] = {}
    request_id: Optional[str] = None

async def handle_agent_message(session: AgentSession, raw: dict):
    msg = ClientMessage(**raw)

    if msg.type == ClientMessageType.HEARTBEAT:
        session.last_heartbeat = datetime.now(timezone.utc)
        await session.send_event("pong", {})
        return

    if msg.type == ClientMessageType.QUERY:
        asyncio.create_task(
            stream_agent_response(session, msg.payload["prompt"], msg.request_id)
        )

    if msg.type == ClientMessageType.CANCEL:
        cancel_agent_run(session.user_id, msg.request_id)

Heartbeat and Dead Connection Detection

Network failures often happen silently — the TCP connection stays open at the OS level even though packets are no longer being delivered. Heartbeats detect these zombie connections by requiring periodic proof-of-life from the client.

HEARTBEAT_INTERVAL = 30  # seconds
HEARTBEAT_TIMEOUT = 90   # seconds — 3 missed heartbeats

async def heartbeat_monitor():
    while True:
        await asyncio.sleep(HEARTBEAT_INTERVAL)
        now = datetime.now(timezone.utc)
        dead_sessions = []

        for user_id, session in sessions.items():
            elapsed = (now - session.last_heartbeat).total_seconds()
            if elapsed > HEARTBEAT_TIMEOUT:
                dead_sessions.append(user_id)

        for user_id in dead_sessions:
            session = sessions.pop(user_id, None)
            if session:
                try:
                    await session.ws.close(code=4002, reason="Heartbeat timeout")
                except Exception:
                    pass

Start this monitor as a background task when the application boots. The three-interval tolerance prevents false disconnections from brief network hiccups.

Client-Side Reconnection with Exponential Backoff

The client must handle reconnection automatically. Exponential backoff with jitter prevents a thundering herd when a server restarts and hundreds of clients try to reconnect simultaneously.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

class AgentWebSocket {
  private ws: WebSocket | null = null;
  private reconnectAttempt = 0;
  private maxReconnectDelay = 30000;

  constructor(private url: string, private token: string) {}

  connect(): void {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      this.reconnectAttempt = 0;
      this.ws!.send(JSON.stringify({ type: "auth", token: this.token }));
      this.startHeartbeat();
    };

    this.ws.onclose = (event) => {
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      this.handleMessage(msg);
    };
  }

  private scheduleReconnect(): void {
    const baseDelay = Math.min(
      1000 * Math.pow(2, this.reconnectAttempt),
      this.maxReconnectDelay
    );
    const jitter = baseDelay * 0.5 * Math.random();
    const delay = baseDelay + jitter;
    this.reconnectAttempt++;
    setTimeout(() => this.connect(), delay);
  }

  private startHeartbeat(): void {
    setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: "ping" }));
      }
    }, 25000);
  }
}

The jitter adds randomness to the backoff delay, spreading reconnection attempts over time instead of creating a synchronized spike.

FAQ

Why not use HTTP long-polling instead of WebSockets for AI agent communication?

Long-polling requires the client to repeatedly open new HTTP connections, each carrying full headers and going through TLS negotiation. For AI agents that exchange dozens of messages per minute — streaming tokens, tool calls, status updates — the overhead is substantial. WebSockets maintain a single connection with minimal per-message framing (as low as 2 bytes for small messages), making them far more efficient for bidirectional, high-frequency communication.

How do you handle WebSocket connections across multiple server instances behind a load balancer?

You need sticky sessions or a shared session registry. Configure your load balancer to use IP hash or cookie-based affinity so a reconnecting client hits the same server. Alternatively, use a shared pub/sub layer like Redis — when a message arrives for a user, publish it to a channel, and whichever server holds that user's WebSocket will receive and forward it. This decouples message routing from connection ownership.

What happens to in-flight AI agent responses when a WebSocket disconnects unexpectedly?

Design your protocol with request IDs and server-side response buffering. When the agent finishes generating a response, store it temporarily (in Redis or memory with a TTL). When the client reconnects, it sends its last seen request ID, and the server replays any missed messages. This ensures no agent output is lost even during brief network interruptions.


#WebSocket #RealTimeAI #AgenticAI #Python #Streaming #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like