Skip to content
Learn Agentic AI
Learn Agentic AI12 min read4 views

OpenAI Agents SDK with FastAPI: Production Web Server Integration Patterns

Learn how to mount OpenAI Agents SDK agents inside a FastAPI web server with session management, concurrent user handling, streaming responses, and production-ready error handling.

Why FastAPI and Agents SDK Work Well Together

FastAPI is async-native. The OpenAI Agents SDK is async-native. This alignment means you can run agent loops inside request handlers without blocking other users. No thread pools, no workarounds — just native async/await throughout the stack.

This guide shows you how to build a production web API that exposes agent capabilities to multiple concurrent users with proper session isolation.

Basic Integration: Agent as an Endpoint

The simplest pattern wraps a Runner.run call inside a FastAPI route.

flowchart LR
    INPUT(["User input"])
    AGENT["Agent<br/>name plus instructions"]
    HAND{"Handoff to<br/>another agent?"}
    SUB["Sub-agent<br/>specialist"]
    GUARD{"Guardrail<br/>passed?"}
    TOOL["Tool call"]
    SDK[("Tracing<br/>OpenAI dashboard")]
    OUT(["Final output"])
    INPUT --> AGENT --> HAND
    HAND -->|Yes| SUB --> GUARD
    HAND -->|No| GUARD
    GUARD -->|Yes| TOOL --> AGENT
    GUARD -->|Block| OUT
    AGENT --> OUT
    AGENT --> SDK
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agents import Agent, Runner

app = FastAPI(title="Agent API")

support_agent = Agent(
    name="support",
    instructions="You are a customer support agent for a SaaS product.",
)

class ChatRequest(BaseModel):
    message: str
    user_id: str

class ChatResponse(BaseModel):
    reply: str
    agent_name: str

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        result = await Runner.run(
            support_agent,
            input=request.message,
        )
        return ChatResponse(
            reply=result.final_output,
            agent_name=result.last_agent.name,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Session Management: Multi-Turn Conversations

Real conversations span multiple requests. You need to persist the conversation state between calls. Here is a session manager that stores history per user.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
from datetime import datetime, timedelta
from typing import Any
import uuid

class SessionManager:
    def __init__(self, ttl_minutes: int = 60):
        self._sessions: dict[str, dict[str, Any]] = {}
        self.ttl = timedelta(minutes=ttl_minutes)

    def get_or_create(self, session_id: str) -> dict[str, Any]:
        if session_id not in self._sessions:
            self._sessions[session_id] = {
                "id": session_id,
                "history": [],
                "created_at": datetime.utcnow(),
                "last_active": datetime.utcnow(),
            }
        session = self._sessions[session_id]
        session["last_active"] = datetime.utcnow()
        return session

    def cleanup_expired(self):
        now = datetime.utcnow()
        expired = [
            sid for sid, s in self._sessions.items()
            if now - s["last_active"] > self.ttl
        ]
        for sid in expired:
            del self._sessions[sid]

sessions = SessionManager(ttl_minutes=30)

Multi-Turn Endpoint with History

Now wire the session manager into your endpoint so each request carries forward the conversation.

from agents.items import TResponseInputItem

class MultiTurnRequest(BaseModel):
    message: str
    session_id: str | None = None

class MultiTurnResponse(BaseModel):
    reply: str
    session_id: str
    turn_count: int

@app.post("/chat/session", response_model=MultiTurnResponse)
async def chat_session(request: MultiTurnRequest):
    session_id = request.session_id or str(uuid.uuid4())
    session = sessions.get_or_create(session_id)

    # Build input from history plus new message
    input_items: list[TResponseInputItem] = list(session["history"])
    input_items.append({"role": "user", "content": request.message})

    result = await Runner.run(support_agent, input=input_items)

    # Persist the new turn in session history
    session["history"] = result.to_input_list()

    return MultiTurnResponse(
        reply=result.final_output,
        session_id=session_id,
        turn_count=len([
            item for item in session["history"]
            if isinstance(item, dict) and item.get("role") == "user"
        ]),
    )

Streaming Responses with Server-Sent Events

For long agent responses, streaming gives users immediate feedback.

from fastapi.responses import StreamingResponse
from agents import Runner

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def event_generator():
        result = Runner.run_streamed(support_agent, input=request.message)

        async for event in result.stream_events():
            if hasattr(event, "data"):
                yield f"data: {event.data}\n\n"

        yield f"data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

Handling Concurrent Users

FastAPI handles concurrency naturally with async, but you need to ensure agent state is isolated per request. Never share mutable agent state across requests.

from contextlib import asynccontextmanager
import asyncio

# Rate limiting per user
user_semaphores: dict[str, asyncio.Semaphore] = {}

def get_user_semaphore(user_id: str, max_concurrent: int = 3) -> asyncio.Semaphore:
    if user_id not in user_semaphores:
        user_semaphores[user_id] = asyncio.Semaphore(max_concurrent)
    return user_semaphores[user_id]

@app.post("/chat/limited")
async def chat_with_limit(request: ChatRequest):
    semaphore = get_user_semaphore(request.user_id)

    if not semaphore._value:
        raise HTTPException(
            status_code=429,
            detail="Too many concurrent requests. Please wait.",
        )

    async with semaphore:
        result = await Runner.run(support_agent, input=request.message)
        return {"reply": result.final_output}

Startup and Shutdown Lifecycle

Use FastAPI's lifespan events to manage resources.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: validate agent configuration
    print("Agent API starting, validating agents...")
    test_result = await Runner.run(support_agent, input="ping")
    print(f"Agent validated: {test_result.last_agent.name}")
    yield
    # Shutdown: cleanup
    sessions.cleanup_expired()
    print("Agent API shutdown complete")

app = FastAPI(title="Agent API", lifespan=lifespan)

FAQ

How do I handle agent timeouts in a web server context?

Wrap your Runner.run call with asyncio.wait_for(Runner.run(...), timeout=30.0). This raises asyncio.TimeoutError after 30 seconds, which you catch and return as a 504 Gateway Timeout. Set the timeout based on your load balancer and client expectations.

Should I create a new Agent instance per request?

No. Agent instances are lightweight configuration objects — they hold instructions, tool definitions, and handoff lists. They do not store conversation state. Create agents once at module level and reuse them across requests. The Runner manages per-request state internally.

How do I scale this beyond a single server?

Move session storage from in-memory dictionaries to Redis. Use Redis as your session backend so any server instance can resume any conversation. Deploy multiple FastAPI instances behind a load balancer. The agents are stateless, so horizontal scaling is straightforward.


#OpenAIAgentsSDK #FastAPI #Production #WebServer #Python #SessionManagement #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026

How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.

Agentic AI

OpenAI Agents SDK vs Assistants API in 2026: Migration Guide with Eval Parity

Honest principal-engineer comparison of the OpenAI Agents SDK and the legacy Assistants API, with a migration checklist and eval-parity strategy so you don't ship regressions.

Agentic AI

Parallel Tool Calling in the OpenAI Agents SDK: When It Helps, When It Hurts (2026)

OpenAI's parallel function calling can cut latency in half — or burn money on dependent calls. The architecture, code, and an eval that proves the win.

Agentic AI

Tool Selection Accuracy: The Eval Most Teams Skip — and Should Not (2026)

Your agent picked the wrong tool 12% of the time and the final answer was still right. That's a latent bug. Here's the eval pipeline that surfaces it.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Token-Level Evaluation of Streaming Agents: TTFT, Stream Smoothness, and Mid-Stream Hallucination Detection

Streaming changes the eval game — final-answer correctness isn't enough when users perceive the answer one token at a time. Here's the metric set that matters.