Skip to content
Learn Agentic AI
Learn Agentic AI14 min read29 views

Streaming Agent Responses: Real-Time Output with run_streamed()

Build real-time agent interfaces with Runner.run_streamed(). Learn about stream events, WebSocket transport, persistent connections, and building streaming chat UIs.

Why Streaming Matters for Agent Applications

When an agent needs to reason through multiple tool calls before responding, the total latency can reach 10-30 seconds. Without streaming, the user stares at a loading spinner the entire time. With streaming, they see progress immediately — partial text appears as it is generated, tool calls are visible as they execute, and the experience feels responsive.

The OpenAI Agents SDK provides first-class streaming support through Runner.run_streamed(), which returns events in real-time as the agent loop executes.

Basic Streaming Setup

Runner.run_streamed() returns a RunResultStreaming object immediately. You then iterate over its events:

flowchart LR
    INPUT(["User input"])
    AGENT["Agent<br/>name plus instructions"]
    HAND{"Handoff to<br/>another agent?"}
    SUB["Sub-agent<br/>specialist"]
    GUARD{"Guardrail<br/>passed?"}
    TOOL["Tool call"]
    SDK[("Tracing<br/>OpenAI dashboard")]
    OUT(["Final output"])
    INPUT --> AGENT --> HAND
    HAND -->|Yes| SUB --> GUARD
    HAND -->|No| GUARD
    GUARD -->|Yes| TOOL --> AGENT
    GUARD -->|Block| OUT
    AGENT --> OUT
    AGENT --> SDK
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Explainer",
    instructions="Explain topics in detail with examples.",
)

async def main():
    result = Runner.run_streamed(agent, "Explain how TCP/IP works")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print()  # Final newline

asyncio.run(main())

The text appears word-by-word (or chunk-by-chunk) as the model generates it, providing immediate feedback to the user.

Stream Event Types

The stream_events() iterator yields events with different types:

raw_response_event

These are the lowest-level events, corresponding to chunks from the model's streaming response. They contain text deltas, tool call deltas, and other raw data:

async for event in result.stream_events():
    if event.type == "raw_response_event":
        data = event.data
        # Text content delta
        if hasattr(data, 'delta') and hasattr(data.delta, 'text'):
            handle_text_chunk(data.delta.text)

run_item_stream_event

Higher-level events that represent complete items in the agent loop:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
async for event in result.stream_events():
    if event.type == "run_item_stream_event":
        item = event.item

        if item.type == "tool_call_item":
            print(f"\n[Calling tool: {item.raw_item.name}]")

        elif item.type == "tool_call_output_item":
            print(f"\n[Tool returned: {item.output[:100]}]")

        elif item.type == "message_output_item":
            print(f"\n[Agent message]")

agent_updated_stream_event

Fired when the current agent changes during a handoff:

async for event in result.stream_events():
    if event.type == "agent_updated_stream_event":
        print(f"\n[Handed off to: {event.new_agent.name}]")

Building a Complete Streaming Handler

Here is a comprehensive streaming handler that processes all event types:

import asyncio
from agents import Agent, Runner, function_tool

@function_tool
def search_docs(query: str) -> str:
    """Search documentation for relevant articles.

    Args:
        query: Search query.
    """
    return f"Found 3 articles about '{query}': [Article 1, Article 2, Article 3]"

agent = Agent(
    name="Doc Assistant",
    instructions="Help users find information in documentation. Use the search tool when needed.",
    tools=[search_docs],
)

async def stream_agent_response(user_input: str):
    result = Runner.run_streamed(agent, user_input)

    current_text = ""

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                chunk = event.data.delta.text
                current_text += chunk
                print(chunk, end="", flush=True)

        elif event.type == "run_item_stream_event":
            item = event.item
            if item.type == "tool_call_item":
                print(f"\n  >> Searching: {item.raw_item.name}...", flush=True)
            elif item.type == "tool_call_output_item":
                print(f"  >> Results received", flush=True)

        elif event.type == "agent_updated_stream_event":
            print(f"\n  >> Transferring to {event.new_agent.name}...", flush=True)

    print()

    # Access the complete result after streaming
    final_output = result.final_output
    return final_output

asyncio.run(stream_agent_response("How do I configure authentication?"))

WebSocket Transport

By default, the SDK uses HTTP with Server-Sent Events (SSE) for streaming. For lower latency and bidirectional communication, you can switch to WebSocket transport:

from agents import set_default_openai_responses_transport

# Switch to WebSocket transport globally
set_default_openai_responses_transport("websocket")

WebSocket transport benefits:

  • Lower latency: No HTTP overhead per message
  • Persistent connection: Reuses the same connection across multiple requests
  • Bidirectional: Foundation for real-time interactive agents

Persistent WebSocket Sessions

For applications that make many sequential LLM calls (like agents with multiple tool-calling turns), persistent WebSocket sessions avoid the connection setup overhead:

from agents import Runner
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_with_persistent_websocket():
    agent = Agent(
        name="Fast Agent",
        instructions="Respond quickly using tools.",
        tools=[tool_a, tool_b, tool_c],
    )

    # Use a persistent WebSocket session
    async with client.responses.websocket_session() as session:
        result = Runner.run_streamed(
            agent,
            "Process this complex request",
        )

        async for event in result.stream_events():
            if event.type == "raw_response_event":
                if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                    print(event.data.delta.text, end="", flush=True)

This is especially valuable when an agent makes 5-10 LLM calls in a single run (due to tool calls). Each subsequent call reuses the WebSocket connection instead of establishing a new HTTP connection.

Streaming with FastAPI

Here is how to integrate streaming into a FastAPI endpoint using Server-Sent Events:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json

app = FastAPI()

agent = Agent(
    name="Chat Agent",
    instructions="You are a helpful chat assistant.",
)

async def event_generator(user_input: str):
    result = Runner.run_streamed(agent, user_input)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                chunk = event.data.delta.text
                yield f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"

        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                yield f"data: {json.dumps({'type': 'tool_call', 'name': event.item.raw_item.name})}\n\n"

    yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    return StreamingResponse(
        event_generator(request["message"]),
        media_type="text/event-stream",
    )

The frontend consumes this with the EventSource API or a fetch-based SSE reader:

// Frontend JavaScript
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: userInput }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  const lines = text.split('\n');

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = JSON.parse(line.slice(6));
      if (data.type === 'text') {
        appendToChat(data.content);
      } else if (data.type === 'tool_call') {
        showToolIndicator(data.name);
      }
    }
  }
}

Streaming with Multi-Agent Handoffs

When agents hand off to each other, streaming shows the transition in real-time:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

support_agent = Agent(
    name="Support Agent",
    instructions="Handle general questions.",
)

billing_agent = Agent(
    name="Billing Agent",
    instructions="Handle billing questions.",
)

triage_agent = Agent(
    name="Triage Agent",
    instructions="Route to the appropriate agent.",
    handoffs=[support_agent, billing_agent],
)

async def stream_with_handoffs(user_input: str):
    result = Runner.run_streamed(triage_agent, user_input)
    current_agent = triage_agent.name

    async for event in result.stream_events():
        if event.type == "agent_updated_stream_event":
            current_agent = event.new_agent.name
            print(f"\n--- Transferred to {current_agent} ---\n")

        elif event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print(f"\n\nFinal agent: {result.last_agent.name}")

Performance Considerations

  1. Use WebSocket transport for high-frequency applications. If your agent makes many LLM calls per request, the connection reuse significantly reduces latency.

  2. Buffer small chunks. In a web UI, updating the DOM for every single token can cause performance issues. Buffer chunks and update on a timer (every 50-100ms).

  3. Handle backpressure. If your event consumer is slower than the stream producer, events can queue up in memory. Monitor memory usage in high-throughput scenarios.

  4. Set timeouts on the stream. A stalled stream can hold connections open indefinitely. Implement a timeout that closes the stream if no events arrive within a reasonable window.

  5. Test with slow connections. Streaming UIs behave differently on 3G vs fiber. Test with network throttling to ensure a good experience across connection speeds.

Best Practices

  1. Always handle all event types. Even if you only display text, log tool calls and handoffs for debugging.

  2. Show progress indicators during tool calls. Users should know the agent is working, not stalled.

  3. Provide a fallback for non-streaming clients. Not all clients support SSE or WebSocket. Offer a non-streaming endpoint as well.

  4. Clean up resources. If the user disconnects mid-stream, ensure the streaming context is properly closed.


Source: OpenAI Agents SDK — Streaming

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Token-Level Evaluation of Streaming Agents: TTFT, Stream Smoothness, and Mid-Stream Hallucination Detection

Streaming changes the eval game — final-answer correctness isn't enough when users perceive the answer one token at a time. Here's the metric set that matters.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026

How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.

Agentic AI

Browser Agents with LangGraph + Playwright: Visual Evaluation Pipelines That Don't Lie

Build a browser agent with LangGraph and Playwright that does multi-step web tasks, then ground-truth its work with visual diffs and DOM-based evaluators.

Agentic AI

OpenAI Computer-Use Agents (CUA) in Production: Build + Evaluate a Real Workflow (2026)

Build a working computer-use agent with the OpenAI Computer Use tool — clicks, types, scrolls a real browser — then evaluate task success on a benchmark suite.

Funding & Industry

OpenAI revenue run-rate — April 2026 read — April 2026 update

OpenAI's April 2026 reported revenue run-rate cleared $13B annualized, on continued ChatGPT growth, agentic Operator monetization, and enterprise API expansion.