Real-Time AI: Streaming, WebSockets, and Server-Sent Events for LLM Applications
How to build responsive AI applications using streaming, WebSockets, and SSE, with practical patterns for token streaming, agent status updates, and real-time collaboration.
Why Real-Time Matters for AI
LLM inference is slow compared to traditional APIs. A complex query to a frontier model can take 5-30 seconds for the full response. Without streaming, users stare at a loading spinner for the entire duration. With streaming, they see tokens appear in real-time, dramatically improving perceived performance and user experience.
But token streaming is just the beginning. Production AI systems need real-time updates for agent status, tool execution progress, error notifications, and multi-user collaboration.
Token Streaming: The Foundation
Server-Sent Events (SSE)
SSE is the most common pattern for LLM token streaming. It uses a standard HTTP connection with a special content type:
# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
@app.post("/api/chat")
async def chat(request: ChatRequest):
async def generate():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-20250514",
messages=request.messages,
max_tokens=4096
) as stream:
async for event in stream:
if event.type == "content_block_delta":
yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
# Send final message with usage stats
final = await stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
Client-side consumption:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(l => l.startsWith('data: '));
for (const line of lines) {
const data = JSON.parse(line.slice(6));
if (data.text) appendToUI(data.text);
if (data.done) showUsageStats(data.usage);
}
}
SSE advantages: Simple, HTTP-based, works through most proxies and load balancers, automatic reconnection built into the EventSource API.
flowchart TD
HUB(("Why Real-Time Matters<br/>for AI"))
HUB --> L0["Token Streaming: The<br/>Foundation"]
style L0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
HUB --> L1["Choosing the Right Protocol"]
style L1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
HUB --> L2["Production Patterns"]
style L2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style HUB fill:#4f46e5,stroke:#4338ca,color:#fff
flowchart LR
IN(["Input prompt"])
subgraph PRE["Pre processing"]
TOK["Tokenize"]
EMB["Embed"]
end
subgraph CORE["Model Core"]
ATTN["Self attention layers"]
MLP["Feed forward layers"]
end
subgraph POST["Post processing"]
SAMP["Sampling"]
DETOK["Detokenize"]
end
OUT(["Generated text"])
IN --> TOK --> EMB --> ATTN --> MLP --> SAMP --> DETOK --> OUT
style IN fill:#f1f5f9,stroke:#64748b,color:#0f172a
style CORE fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
flowchart TD
HUB(("Why Real-Time Matters<br/>for AI"))
HUB --> L0["Token Streaming: The<br/>Foundation"]
style L0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
HUB --> L1["Choosing the Right Protocol"]
style L1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
HUB --> L2["Production Patterns"]
style L2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style HUB fill:#4f46e5,stroke:#4338ca,color:#fff
SSE limitations: Unidirectional (server to client only), limited to text data, connection limits per domain in browsers (6 in HTTP/1.1).
WebSockets
WebSockets provide full-duplex communication, essential for interactive agent sessions:
# FastAPI WebSocket for interactive agent
from fastapi import WebSocket
@app.websocket("/ws/agent")
async def agent_session(websocket: WebSocket):
await websocket.accept()
agent = create_agent(tools=available_tools)
while True:
user_message = await websocket.receive_json()
async for event in agent.run_stream(user_message["content"]):
match event.type:
case "thinking":
await websocket.send_json({
"type": "thinking",
"content": event.text
})
case "tool_call":
await websocket.send_json({
"type": "tool_call",
"tool": event.name,
"args": event.args,
"status": "executing"
})
case "tool_result":
await websocket.send_json({
"type": "tool_result",
"tool": event.name,
"result": event.result
})
case "text_delta":
await websocket.send_json({
"type": "text",
"content": event.text
})
WebSocket advantages: Bidirectional, low latency, supports binary data, client can send messages while receiving.
WebSocket limitations: More complex infrastructure (sticky sessions, WebSocket-aware load balancers), no automatic reconnection, connection management overhead.
Choosing the Right Protocol
| Use Case | Recommended Protocol |
|---|---|
| Simple chat with streaming | SSE |
| Interactive agent with tool use | WebSocket |
| Real-time collaboration | WebSocket |
| Notification/status updates | SSE |
| Voice/audio streaming | WebSocket |
| Webhook-style events | SSE |
Production Patterns
Structured Streaming Events
Do not just stream raw text. Define an event protocol:
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
type StreamEvent =
| { type: 'text_delta'; content: string }
| { type: 'tool_start'; tool: string; args: Record<string, unknown> }
| { type: 'tool_end'; tool: string; result: unknown; duration_ms: number }
| { type: 'thinking'; content: string }
| { type: 'error'; message: string; recoverable: boolean }
| { type: 'done'; usage: { input_tokens: number; output_tokens: number } };
This enables rich UI updates: show a spinner when a tool is executing, display thinking text in a collapsible panel, and show token usage when complete.
Backpressure Handling
If the client cannot consume tokens as fast as the model generates them (common on slow networks), implement backpressure:
- SSE: The TCP send buffer naturally provides backpressure, but set reasonable buffer limits
- WebSocket: Monitor the send buffer size and pause generation if it exceeds a threshold
Reconnection and State Recovery
Connections drop. Your protocol should handle it:
# Server-side: assign event IDs for recovery
event_id = 0
async for token in stream:
event_id += 1
yield f"id: {event_id}\ndata: {json.dumps({'text': token})}\n\n"
# Client-side: reconnect with Last-Event-ID
const eventSource = new EventSource('/stream', {
headers: { 'Last-Event-ID': lastReceivedId }
});
Infrastructure Considerations
- Reverse proxies: Nginx requires
proxy_buffering offandproxy_read_timeoutsettings for SSE. Useproxy_http_version 1.1andUpgradeheaders for WebSocket - Load balancers: WebSocket requires sticky sessions or connection-aware routing. SSE works with standard HTTP load balancing
- CDNs: Most CDNs do not support SSE/WebSocket. Route real-time traffic directly to origin
- Kubernetes: Use
sessionAffinity: ClientIPfor WebSocket services; increaseproxy-read-timeoutannotations for SSE
Streaming is not just a UX nicety -- it is a fundamental requirement for AI applications. The difference between a 10-second loading spinner and seeing tokens appear immediately is the difference between an application users tolerate and one they enjoy.
Sources: MDN Server-Sent Events | FastAPI WebSocket Docs | Vercel AI SDK Streaming
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.