Claude API Streaming: Real-Time AI Responses in Production
Complete guide to implementing streaming responses with the Claude API. Covers SSE implementation, token-by-token rendering, error handling during streams, and production patterns for real-time AI applications.
Why Streaming Matters
Without streaming, a Claude API call blocks until the entire response is generated. For a 1,000-token response, that means 5-15 seconds of silence followed by a wall of text. Users perceive this as slow, unresponsive, and frustrating.
Streaming changes the UX fundamentally. The first token arrives within 500ms-2s (time to first token, or TTFT), and subsequent tokens stream in at 50-100 tokens per second. Users see the response forming in real time, which feels fast even when the total generation time is identical.
For production applications -- chatbots, code assistants, real-time analysis tools -- streaming is not optional. It is a core UX requirement.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
Basic Streaming in Python
from anthropic import Anthropic
client = Anthropic()
# Basic streaming with the messages API
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": "Explain how TCP/IP works."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
The stream() method returns a context manager that yields text chunks as they arrive. The flush=True ensures each chunk is printed immediately rather than buffered.
flowchart LR
USER(["User message"])
LOOP{"messages.create<br/>agent loop"}
THINK["Extended thinking<br/>optional"]
TOOL{"stop_reason<br/>tool_use?"}
EXEC["Execute tool<br/>append tool_result"]
DONE(["stop_reason<br/>end_turn"])
USER --> LOOP --> THINK --> TOOL
TOOL -->|Yes| EXEC --> LOOP
TOOL -->|No| DONE
style LOOP fill:#4f46e5,stroke:#4338ca,color:#fff
style THINK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style DONE fill:#059669,stroke:#047857,color:#fff
Basic Streaming in TypeScript
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = await client.messages.stream({
model: "claude-sonnet-4-5-20250514",
max_tokens: 4096,
messages: [{ role: "user", content: "Explain how TCP/IP works." }],
});
for await (const event of stream) {
if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
process.stdout.write(event.delta.text);
}
}
// Get the final message with usage stats
const finalMessage = await stream.finalMessage();
console.log("\nTokens used:", finalMessage.usage);
Server-Sent Events (SSE) Architecture
The Claude API uses Server-Sent Events for streaming. Each event has a type that tells you what is happening:
| Event Type | Description | When It Occurs |
|---|---|---|
message_start |
Message metadata, model info | First event |
content_block_start |
New content block begins | Before each text/tool block |
content_block_delta |
Incremental content update | During generation |
content_block_stop |
Content block complete | After each block |
message_delta |
Message-level updates (stop reason, usage) | Near end |
message_stop |
Stream complete | Last event |
Handling All Event Types
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": "Write a Python function to sort a list."}]
) as stream:
for event in stream:
match event.type:
case "message_start":
print(f"Model: {event.message.model}")
case "content_block_start":
if event.content_block.type == "text":
print("--- Text block started ---")
elif event.content_block.type == "tool_use":
print(f"--- Tool call: {event.content_block.name} ---")
case "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
print(event.delta.partial_json, end="", flush=True)
case "message_delta":
print(f"\nStop reason: {event.delta.stop_reason}")
print(f"Output tokens: {event.usage.output_tokens}")
case "message_stop":
print("\n--- Stream complete ---")
Streaming with Tool Use
Streaming becomes more complex when tools are involved. Claude may stream text, then switch to a tool call, then resume text after seeing the tool result.
import json
def stream_with_tools(user_message: str, tools: list):
messages = [{"role": "user", "content": user_message}]
while True:
collected_text = ""
tool_calls = []
current_tool_input = ""
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
collected_text += event.delta.text
elif event.delta.type == "input_json_delta":
current_tool_input += event.delta.partial_json
elif event.type == "content_block_start":
if event.content_block.type == "tool_use":
current_tool_input = ""
tool_calls.append({
"id": event.content_block.id,
"name": event.content_block.name,
})
elif event.type == "content_block_stop":
if tool_calls and current_tool_input:
tool_calls[-1]["input"] = json.loads(current_tool_input)
current_tool_input = ""
final = stream.get_final_message()
# If no tool calls, we are done
if final.stop_reason != "tool_use":
return collected_text
# Execute tools and continue
messages.append({"role": "assistant", "content": final.content})
tool_results = []
for tc in tool_calls:
result = execute_tool(tc["name"], tc["input"])
tool_results.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": json.dumps(result),
})
messages.append({"role": "user", "content": tool_results})
Building a Streaming API Endpoint
For web applications, you need to proxy the Claude stream to your frontend. Here is a FastAPI implementation:
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
app = FastAPI()
client = Anthropic()
@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
async def generate():
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
system=request.system_prompt,
messages=request.messages,
) as stream:
for text in stream.text_stream:
# Format as SSE
yield f"data: {json.dumps({'text': text})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
Frontend Consumer (React)
async function streamChat(messages: Message[]): AsyncGenerator<string> {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const data = JSON.parse(line.slice(6));
yield data.text;
}
}
}
}
// Usage in a React component
function ChatComponent() {
const [response, setResponse] = useState("");
const handleSend = async (message: string) => {
setResponse("");
for await (const chunk of streamChat([{ role: "user", content: message }])) {
setResponse(prev => prev + chunk);
}
};
return <div>{response}</div>;
}
Error Handling During Streams
Streams can fail mid-generation due to network issues, rate limits, or server errors. Robust error handling is essential.
from anthropic import APIConnectionError, RateLimitError, APIStatusError
import time
def stream_with_retry(messages: list, max_retries: int = 3):
for attempt in range(max_retries):
try:
collected = ""
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
collected += text
yield text
return # Success
except APIConnectionError:
if attempt < max_retries - 1:
wait = 2 ** attempt
time.sleep(wait)
continue
raise
except RateLimitError as e:
retry_after = int(e.response.headers.get("retry-after", 30))
time.sleep(retry_after)
continue
except APIStatusError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
Performance Optimization
Token Buffering
Sending every single token to the frontend creates excessive network overhead. Buffer tokens and flush periodically:
import time
def buffered_stream(messages: list, flush_interval: float = 0.05):
buffer = ""
last_flush = time.time()
with client.messages.stream(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
buffer += text
now = time.time()
if now - last_flush >= flush_interval or len(buffer) > 100:
yield buffer
buffer = ""
last_flush = now
if buffer: # Flush remaining
yield buffer
Connection Keep-Alive
For high-throughput applications, reuse HTTP connections. The Anthropic Python SDK handles this automatically through its internal httpx client. In TypeScript, the SDK uses node-fetch with connection pooling enabled by default.
Monitoring Streaming Performance
Track these metrics in production:
- Time to first token (TTFT): Should be under 2 seconds for interactive applications
- Tokens per second: Typically 50-100 for Claude Sonnet
- Stream completion rate: Percentage of streams that complete without error
- Partial response recovery: How often you successfully retry after mid-stream failures
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.