Deploying Chat Agents with WebSocket Connections for Real-Time Interaction
Build a real-time chat agent using WebSocket connections with FastAPI, OpenAI's streaming responses, persistent conversation state, and response chaining via previous_response_id.
Why WebSockets for Chat Agents?
HTTP request-response is fine for simple chatbot interactions, but production chat agents need real-time, bidirectional communication. Users expect to see responses streaming in character by character, not waiting several seconds for a complete response. WebSockets provide a persistent connection where both client and server can send messages at any time, enabling streaming responses, typing indicators, and immediate feedback.
In this post, we build a complete WebSocket-based chat agent using FastAPI on the backend and OpenAI's streaming capabilities.
Architecture Overview
The architecture has three layers:
flowchart LR
INPUT(["User intent"])
PARSE["Parse plus<br/>classify"]
PLAN["Plan and tool<br/>selection"]
AGENT["Agent loop<br/>LLM plus tools"]
GUARD{"Guardrails<br/>and policy"}
EXEC["Execute and<br/>verify result"]
OBS[("Trace and metrics")]
OUT(["Outcome plus<br/>next action"])
INPUT --> PARSE --> PLAN --> AGENT --> GUARD
GUARD -->|Pass| EXEC --> OUT
GUARD -->|Fail| AGENT
AGENT --> OBS
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
- Client — A browser (or mobile app) opens a WebSocket connection
- Server — FastAPI manages WebSocket lifecycle, authentication, and message routing
- Agent — OpenAI's Agents SDK processes messages and streams responses
Each WebSocket connection represents one conversation session. The server maintains conversation history and uses previous_response_id to chain responses for continuity.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
FastAPI WebSocket Server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from typing import Dict
import json
import asyncio
from datetime import datetime
app = FastAPI()
class ConnectionManager:
"""Manages active WebSocket connections and conversation state."""
def __init__(self):
self.active: Dict[str, WebSocket] = {}
self.conversations: Dict[str, list] = {}
self.last_response_ids: Dict[str, str] = {}
async def connect(self, session_id: str, websocket: WebSocket):
await websocket.accept()
self.active[session_id] = websocket
if session_id not in self.conversations:
self.conversations[session_id] = []
def disconnect(self, session_id: str):
self.active.pop(session_id, None)
async def send_event(self, session_id: str, event: dict):
ws = self.active.get(session_id)
if ws:
await ws.send_json(event)
def get_history(self, session_id: str) -> list:
return self.conversations.get(session_id, [])
def add_message(self, session_id: str, role: str, content: str):
self.conversations.setdefault(session_id, []).append({
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
})
def set_last_response_id(self, session_id: str, response_id: str):
self.last_response_ids[session_id] = response_id
def get_last_response_id(self, session_id: str) -> str | None:
return self.last_response_ids.get(session_id)
manager = ConnectionManager()
The Chat Agent
from agents import Agent, Runner
support_agent = Agent(
name="Real-Time Support Agent",
instructions="""You are a real-time customer support agent for TechCorp.
You help users with account issues, billing questions, and product
features. Be concise since this is a real-time chat. Keep responses
under 3 paragraphs unless the user asks for detailed explanations.
Use markdown formatting for code snippets and lists.""",
)
WebSocket Endpoint with Streaming
The core endpoint handles the WebSocket lifecycle, processes user messages through the agent, and streams responses back token by token:
@app.websocket("/ws/chat/{session_id}")
async def websocket_chat(
websocket: WebSocket,
session_id: str,
token: str = Query(default=None),
):
# Authenticate the connection
if not await verify_token(token):
await websocket.close(code=4001, reason="Unauthorized")
return
await manager.connect(session_id, websocket)
# Send connection confirmation
await manager.send_event(session_id, {
"type": "connected",
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat(),
})
try:
while True:
# Receive message from client
data = await websocket.receive_json()
message_type = data.get("type", "message")
if message_type == "message":
user_text = data.get("content", "").strip()
if not user_text:
continue
# Store user message
manager.add_message(session_id, "user", user_text)
# Send typing indicator
await manager.send_event(session_id, {
"type": "typing",
"is_typing": True,
})
# Stream agent response
await stream_agent_response(session_id, user_text)
elif message_type == "ping":
await manager.send_event(session_id, {"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(session_id)
Streaming Agent Responses
The key function that bridges the agent SDK with WebSocket streaming:
async def stream_agent_response(session_id: str, user_input: str):
"""Run the agent and stream the response over WebSocket."""
# Build input with conversation history context
previous_response_id = manager.get_last_response_id(session_id)
full_response = ""
stream_id = None
try:
result = Runner.run_streamed(
support_agent,
input=user_input,
)
# Send start event
await manager.send_event(session_id, {
"type": "response_start",
})
async for event in result.stream_events():
# Handle different streaming event types
if event.type == "raw_response_event":
raw = event.data
# Text delta — stream to client
if hasattr(raw, "type") and raw.type == "response.output_text.delta":
chunk = raw.delta
full_response += chunk
await manager.send_event(session_id, {
"type": "text_delta",
"content": chunk,
})
# Get final result
final = await result.final_output
response_id = getattr(result, "last_response_id", None)
if response_id:
manager.set_last_response_id(session_id, response_id)
# Store assistant message
manager.add_message(session_id, "assistant", full_response)
# Send completion event
await manager.send_event(session_id, {
"type": "response_end",
"full_content": full_response,
})
except Exception as e:
await manager.send_event(session_id, {
"type": "error",
"message": "I encountered an issue. Please try again.",
})
finally:
# Clear typing indicator
await manager.send_event(session_id, {
"type": "typing",
"is_typing": False,
})
Response Chaining with previous_response_id
When using OpenAI's Responses API directly, previous_response_id links responses together so the model retains full conversation context without you resending all messages:
from openai import AsyncOpenAI
client = AsyncOpenAI()
class ConversationChain:
"""Manages chained responses for a single conversation."""
def __init__(self):
self.last_response_id: str | None = None
async def send_message(self, user_input: str) -> str:
params = {
"model": "gpt-4.1",
"input": user_input,
}
# Chain to previous response if one exists
if self.last_response_id:
params["previous_response_id"] = self.last_response_id
response = await client.responses.create(**params)
self.last_response_id = response.id
return response.output_text
# Usage across multiple turns
chain = ConversationChain()
reply1 = await chain.send_message("My name is Alex and I need help with billing.")
reply2 = await chain.send_message("What is my current plan?")
# The model remembers Alex's name and billing context from reply1
This approach is more efficient than resending the full conversation history because OpenAI caches previous responses server-side.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Client-Side WebSocket Handler
class ChatClient {
constructor(sessionId, token) {
this.sessionId = sessionId;
this.ws = new WebSocket(
`wss://api.example.com/ws/chat/${sessionId}?token=${token}`
);
this.messageBuffer = "";
this.setupHandlers();
}
setupHandlers() {
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "connected":
this.onConnected(data);
break;
case "typing":
this.onTyping(data.is_typing);
break;
case "text_delta":
this.messageBuffer += data.content;
this.onStreamChunk(this.messageBuffer);
break;
case "response_end":
this.messageBuffer = "";
this.onResponseComplete(data.full_content);
break;
case "error":
this.onError(data.message);
break;
}
};
this.ws.onclose = (event) => {
if (event.code !== 1000) {
// Abnormal close — attempt reconnect
setTimeout(() => this.reconnect(), 2000);
}
};
}
send(content) {
this.ws.send(JSON.stringify({
type: "message",
content: content,
}));
}
reconnect() {
// Reconnection logic with exponential backoff
this.ws = new WebSocket(
`wss://api.example.com/ws/chat/${this.sessionId}`
);
this.setupHandlers();
}
}
Heartbeat and Connection Health
WebSocket connections can silently die. Implement a heartbeat mechanism:
async def heartbeat(session_id: str, interval: int = 30):
"""Send periodic pings to keep the connection alive."""
while session_id in manager.active:
try:
await manager.send_event(session_id, {"type": "ping"})
await asyncio.sleep(interval)
except Exception:
manager.disconnect(session_id)
break
Start the heartbeat task when a connection is established:
await manager.connect(session_id, websocket)
heartbeat_task = asyncio.create_task(heartbeat(session_id))
try:
# ... main message loop
finally:
heartbeat_task.cancel()
manager.disconnect(session_id)
Scaling WebSocket Connections
For production deployments with multiple server instances, you need a pub/sub layer so messages reach the right server. Redis Pub/Sub works well:
import redis.asyncio as redis
class RedisPubSubBridge:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def publish(self, session_id: str, event: dict):
channel = f"chat:{session_id}"
await self.redis.publish(channel, json.dumps(event))
async def subscribe(self, session_id: str, callback):
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"chat:{session_id}")
async for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
await callback(event)
This ensures that even if the WebSocket connection lands on server A but the agent processing happens on server B, the response still reaches the client.
Production Deployment Checklist
- TLS termination — Always use
wss://, neverws://in production - Connection limits — Set max connections per user to prevent resource exhaustion
- Message size limits — Reject messages over a reasonable size (e.g., 10KB)
- Authentication — Validate JWT tokens on connection, not just on the first message
- Graceful shutdown — Close all connections with code 1001 (Going Away) during deploys
- Monitoring — Track active connections, message throughput, and error rates
- Load balancing — Use sticky sessions or a pub/sub bridge for multi-instance deployments
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.