Streaming Agent Responses: Real-Time Output with run_streamed()
Build real-time agent interfaces with Runner.run_streamed(). Learn about stream events, WebSocket transport, persistent connections, and building streaming chat UIs.
Why Streaming Matters for Agent Applications
When an agent needs to reason through multiple tool calls before responding, the total latency can reach 10-30 seconds. Without streaming, the user stares at a loading spinner the entire time. With streaming, they see progress immediately — partial text appears as it is generated, tool calls are visible as they execute, and the experience feels responsive.
The OpenAI Agents SDK provides first-class streaming support through Runner.run_streamed(), which returns events in real-time as the agent loop executes.
Basic Streaming Setup
Runner.run_streamed() returns a RunResultStreaming object immediately. You then iterate over its events:
flowchart LR
INPUT(["User input"])
AGENT["Agent<br/>name plus instructions"]
HAND{"Handoff to<br/>another agent?"}
SUB["Sub-agent<br/>specialist"]
GUARD{"Guardrail<br/>passed?"}
TOOL["Tool call"]
SDK[("Tracing<br/>OpenAI dashboard")]
OUT(["Final output"])
INPUT --> AGENT --> HAND
HAND -->|Yes| SUB --> GUARD
HAND -->|No| GUARD
GUARD -->|Yes| TOOL --> AGENT
GUARD -->|Block| OUT
AGENT --> OUT
AGENT --> SDK
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Explainer",
instructions="Explain topics in detail with examples.",
)
async def main():
result = Runner.run_streamed(agent, "Explain how TCP/IP works")
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print() # Final newline
asyncio.run(main())
The text appears word-by-word (or chunk-by-chunk) as the model generates it, providing immediate feedback to the user.
Stream Event Types
The stream_events() iterator yields events with different types:
raw_response_event
These are the lowest-level events, corresponding to chunks from the model's streaming response. They contain text deltas, tool call deltas, and other raw data:
async for event in result.stream_events():
if event.type == "raw_response_event":
data = event.data
# Text content delta
if hasattr(data, 'delta') and hasattr(data.delta, 'text'):
handle_text_chunk(data.delta.text)
run_item_stream_event
Higher-level events that represent complete items in the agent loop:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
async for event in result.stream_events():
if event.type == "run_item_stream_event":
item = event.item
if item.type == "tool_call_item":
print(f"\n[Calling tool: {item.raw_item.name}]")
elif item.type == "tool_call_output_item":
print(f"\n[Tool returned: {item.output[:100]}]")
elif item.type == "message_output_item":
print(f"\n[Agent message]")
agent_updated_stream_event
Fired when the current agent changes during a handoff:
async for event in result.stream_events():
if event.type == "agent_updated_stream_event":
print(f"\n[Handed off to: {event.new_agent.name}]")
Building a Complete Streaming Handler
Here is a comprehensive streaming handler that processes all event types:
import asyncio
from agents import Agent, Runner, function_tool
@function_tool
def search_docs(query: str) -> str:
"""Search documentation for relevant articles.
Args:
query: Search query.
"""
return f"Found 3 articles about '{query}': [Article 1, Article 2, Article 3]"
agent = Agent(
name="Doc Assistant",
instructions="Help users find information in documentation. Use the search tool when needed.",
tools=[search_docs],
)
async def stream_agent_response(user_input: str):
result = Runner.run_streamed(agent, user_input)
current_text = ""
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
chunk = event.data.delta.text
current_text += chunk
print(chunk, end="", flush=True)
elif event.type == "run_item_stream_event":
item = event.item
if item.type == "tool_call_item":
print(f"\n >> Searching: {item.raw_item.name}...", flush=True)
elif item.type == "tool_call_output_item":
print(f" >> Results received", flush=True)
elif event.type == "agent_updated_stream_event":
print(f"\n >> Transferring to {event.new_agent.name}...", flush=True)
print()
# Access the complete result after streaming
final_output = result.final_output
return final_output
asyncio.run(stream_agent_response("How do I configure authentication?"))
WebSocket Transport
By default, the SDK uses HTTP with Server-Sent Events (SSE) for streaming. For lower latency and bidirectional communication, you can switch to WebSocket transport:
from agents import set_default_openai_responses_transport
# Switch to WebSocket transport globally
set_default_openai_responses_transport("websocket")
WebSocket transport benefits:
- Lower latency: No HTTP overhead per message
- Persistent connection: Reuses the same connection across multiple requests
- Bidirectional: Foundation for real-time interactive agents
Persistent WebSocket Sessions
For applications that make many sequential LLM calls (like agents with multiple tool-calling turns), persistent WebSocket sessions avoid the connection setup overhead:
from agents import Runner
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def run_with_persistent_websocket():
agent = Agent(
name="Fast Agent",
instructions="Respond quickly using tools.",
tools=[tool_a, tool_b, tool_c],
)
# Use a persistent WebSocket session
async with client.responses.websocket_session() as session:
result = Runner.run_streamed(
agent,
"Process this complex request",
)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
This is especially valuable when an agent makes 5-10 LLM calls in a single run (due to tool calls). Each subsequent call reuses the WebSocket connection instead of establishing a new HTTP connection.
Streaming with FastAPI
Here is how to integrate streaming into a FastAPI endpoint using Server-Sent Events:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json
app = FastAPI()
agent = Agent(
name="Chat Agent",
instructions="You are a helpful chat assistant.",
)
async def event_generator(user_input: str):
result = Runner.run_streamed(agent, user_input)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
chunk = event.data.delta.text
yield f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
yield f"data: {json.dumps({'type': 'tool_call', 'name': event.item.raw_item.name})}\n\n"
yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: dict):
return StreamingResponse(
event_generator(request["message"]),
media_type="text/event-stream",
)
The frontend consumes this with the EventSource API or a fetch-based SSE reader:
// Frontend JavaScript
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: userInput }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.type === 'text') {
appendToChat(data.content);
} else if (data.type === 'tool_call') {
showToolIndicator(data.name);
}
}
}
}
Streaming with Multi-Agent Handoffs
When agents hand off to each other, streaming shows the transition in real-time:
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
support_agent = Agent(
name="Support Agent",
instructions="Handle general questions.",
)
billing_agent = Agent(
name="Billing Agent",
instructions="Handle billing questions.",
)
triage_agent = Agent(
name="Triage Agent",
instructions="Route to the appropriate agent.",
handoffs=[support_agent, billing_agent],
)
async def stream_with_handoffs(user_input: str):
result = Runner.run_streamed(triage_agent, user_input)
current_agent = triage_agent.name
async for event in result.stream_events():
if event.type == "agent_updated_stream_event":
current_agent = event.new_agent.name
print(f"\n--- Transferred to {current_agent} ---\n")
elif event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print(f"\n\nFinal agent: {result.last_agent.name}")
Performance Considerations
Use WebSocket transport for high-frequency applications. If your agent makes many LLM calls per request, the connection reuse significantly reduces latency.
Buffer small chunks. In a web UI, updating the DOM for every single token can cause performance issues. Buffer chunks and update on a timer (every 50-100ms).
Handle backpressure. If your event consumer is slower than the stream producer, events can queue up in memory. Monitor memory usage in high-throughput scenarios.
Set timeouts on the stream. A stalled stream can hold connections open indefinitely. Implement a timeout that closes the stream if no events arrive within a reasonable window.
Test with slow connections. Streaming UIs behave differently on 3G vs fiber. Test with network throttling to ensure a good experience across connection speeds.
Best Practices
Always handle all event types. Even if you only display text, log tool calls and handoffs for debugging.
Show progress indicators during tool calls. Users should know the agent is working, not stalled.
Provide a fallback for non-streaming clients. Not all clients support SSE or WebSocket. Offer a non-streaming endpoint as well.
Clean up resources. If the user disconnects mid-stream, ensure the streaming context is properly closed.
Source: OpenAI Agents SDK — Streaming
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.