Running Agents: Runner.run(), run_sync(), and run_streamed() Explained
Master the three execution methods in the OpenAI Agents SDK. Learn when to use async run(), synchronous run_sync(), and streaming run_streamed() with practical code examples.
Three Ways to Run an Agent
The OpenAI Agents SDK provides three methods on the Runner class for executing agents. Each serves a different use case:
| Method | Async | Streaming | Best For |
|---|---|---|---|
Runner.run() |
Yes | No | Production web servers, async applications |
Runner.run_sync() |
No | No | Scripts, CLI tools, notebooks, quick prototyping |
Runner.run_streamed() |
Yes | Yes | Chat UIs, real-time output, long responses |
All three methods execute the same underlying agent loop — the difference is in how they return results to your code.
Runner.run() — The Async Workhorse
Runner.run() is the primary execution method. It is asynchronous, returning an awaitable that resolves to a RunResult when the agent loop completes:
flowchart LR
INPUT(["User input"])
AGENT["Agent<br/>name plus instructions"]
HAND{"Handoff to<br/>another agent?"}
SUB["Sub-agent<br/>specialist"]
GUARD{"Guardrail<br/>passed?"}
TOOL["Tool call"]
SDK[("Tracing<br/>OpenAI dashboard")]
OUT(["Final output"])
INPUT --> AGENT --> HAND
HAND -->|Yes| SUB --> GUARD
HAND -->|No| GUARD
GUARD -->|Yes| TOOL --> AGENT
GUARD -->|Block| OUT
AGENT --> OUT
AGENT --> SDK
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
)
async def main():
result = await Runner.run(
agent,
"Explain the difference between threads and processes.",
)
print(result.final_output)
print(f"Agent that responded: {result.last_agent.name}")
asyncio.run(main())
When to Use run()
Use Runner.run() whenever you are in an async context:
- FastAPI / Starlette endpoints — These are natively async
- Background task workers — Celery with async support, arq, etc.
- Batch processing — Run multiple agents concurrently with
asyncio.gather()
Concurrent Execution
Because run() is async, you can run multiple agents in parallel:
import asyncio
from agents import Agent, Runner
summarizer = Agent(name="Summarizer", instructions="Summarize the given text in 2 sentences.")
translator = Agent(name="Translator", instructions="Translate the given text to French.")
critic = Agent(name="Critic", instructions="Identify logical flaws in the given text.")
async def process_text(text: str):
# Run all three agents concurrently
summarize_task = Runner.run(summarizer, text)
translate_task = Runner.run(translator, text)
critic_task = Runner.run(critic, text)
results = await asyncio.gather(summarize_task, translate_task, critic_task)
return {
"summary": results[0].final_output,
"french": results[1].final_output,
"critique": results[2].final_output,
}
asyncio.run(process_text("The quantum computer will solve all NP-hard problems by 2027."))
This sends three independent LLM requests simultaneously, significantly reducing total latency compared to sequential execution.
Runner.run_sync() — Synchronous Convenience
Runner.run_sync() is a synchronous wrapper around Runner.run(). It blocks the current thread until the agent loop completes:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
from agents import Agent, Runner
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
)
# No async/await needed
result = Runner.run_sync(agent, "What is the capital of Japan?")
print(result.final_output)
When to Use run_sync()
- Scripts and CLI tools — No need to set up an async event loop
- Jupyter notebooks — Avoids event loop conflicts
- Quick prototyping — Fastest way to test an agent
- Django views — If you are not using Django's async views
Important: Do not use run_sync() inside an existing async event loop (like a FastAPI endpoint). It will raise an error or deadlock because it tries to create its own event loop.
Runner.run_streamed() — Real-Time Output
Runner.run_streamed() returns a RunResultStreaming object immediately, then streams events as the agent processes:
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Storyteller",
instructions="Write engaging short stories.",
)
async def main():
result = Runner.run_streamed(agent, "Write a story about a robot learning to paint.")
async for event in result.stream_events():
if event.type == "raw_response_event":
# Access the raw streaming delta
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print() # Newline after streaming completes
# The final result is still available after streaming
final = result.final_output
print(f"\nFull response length: {len(final)} characters")
asyncio.run(main())
Stream Event Types
The stream_events() async iterator yields events with a type field:
raw_response_event— Raw chunks from the model response, including text deltasagent_updated_stream_event— Fired when the current agent changes (during handoffs)run_item_stream_event— Higher-level events for tool calls, messages, handoffs
Building a Chat UI with Streaming
Here is a pattern for building an interactive chat loop with streaming:
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Chat Assistant",
instructions="You are a friendly chat assistant. Keep responses concise.",
)
async def chat():
conversation_history = []
while True:
user_input = input("\nYou: ")
if user_input.lower() in ("quit", "exit"):
break
# Build input with conversation history
conversation_history.append({
"role": "user",
"content": user_input,
})
print("Assistant: ", end="", flush=True)
result = Runner.run_streamed(agent, conversation_history)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print()
# Add assistant response to history
conversation_history.append({
"role": "assistant",
"content": result.final_output,
})
asyncio.run(chat())
Input Types
All three runner methods accept flexible input types:
String Input
The simplest form — a single user message:
result = await Runner.run(agent, "Hello, how are you?")
Message List Input
For multi-turn conversations or providing context:
result = await Runner.run(agent, [
{"role": "user", "content": "My name is Alice."},
{"role": "assistant", "content": "Hello Alice! How can I help you today?"},
{"role": "user", "content": "What is my name?"},
])
Continuing from a Previous Run
Pass a previous RunResult to continue the conversation with full context:
result1 = await Runner.run(agent, "My favorite color is blue.")
result2 = await Runner.run(agent, "What is my favorite color?", previous_result=result1)
# result2.final_output will reference "blue"
RunConfig: Controlling Execution
The RunConfig parameter lets you customize execution behavior:
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
from agents import Agent, Runner, RunConfig
agent = Agent(name="Assistant", instructions="Be helpful.")
result = await Runner.run(
agent,
"Complex multi-step question here...",
run_config=RunConfig(
max_turns=10, # Limit agent loop iterations
tracing_disabled=False, # Enable tracing (default)
workflow_name="customer-support", # Name for tracing
trace_id="unique-trace-id", # Custom trace ID
),
)
max_turns
The max_turns parameter is a safety mechanism that limits how many iterations the agent loop can execute. Each "turn" is one LLM call. If the limit is reached, the SDK raises MaxTurnsExceeded:
from agents import Agent, Runner, MaxTurnsExceeded
agent = Agent(
name="Research Agent",
instructions="Research the topic thoroughly using all available tools.",
tools=[search_tool, analyze_tool],
)
try:
result = await Runner.run(agent, "Research quantum computing", max_turns=5)
except MaxTurnsExceeded:
print("Agent exceeded the maximum number of turns. The task may be too complex.")
Set max_turns based on your use case:
- Simple Q&A: 2-3 turns
- Tool-using agents: 5-10 turns
- Complex research agents: 15-25 turns
- Never leave it unlimited in production
The RunResult Object
Every run returns a RunResult (or RunResultStreaming for streamed runs) with these key properties:
result = await Runner.run(agent, "Hello")
# The final text or structured output
output = result.final_output
# The agent that produced the final output (may differ from the starting agent if handoffs occurred)
last_agent = result.last_agent
# All items generated during the run: messages, tool calls, tool outputs, handoffs
items = result.new_items
# The raw input that started the run
original_input = result.input
# For structured outputs, get the typed result
typed_output = result.final_output_as(MyPydanticModel)
Best Practices
Use
run()in production,run_sync()only for scripts and testing.Always set
max_turnsto prevent runaway agent loops that burn through your API budget.Use streaming for user-facing applications. Waiting 10+ seconds for a response with no feedback is a poor user experience.
Handle exceptions around all runner calls. Network errors, rate limits, and model errors can all occur.
Pass conversation history as message lists for multi-turn chat rather than concatenating strings.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.