Skip to content
Learn Agentic AI
Learn Agentic AI11 min read6 views

Async OpenAI Client: Building High-Throughput AI Applications

Learn how to use AsyncOpenAI with Python's asyncio to make concurrent API calls, implement connection pooling, and build high-throughput AI pipelines.

Why Async Matters for AI Applications

Synchronous OpenAI API calls block your Python thread while waiting for the response — typically 1 to 10 seconds per request. If you need to process 100 items, that means 100 sequential waits. With async programming, you can fire off many requests concurrently and process them as they complete, reducing total wall-clock time dramatically.

The OpenAI Python SDK ships with a fully async client that integrates seamlessly with Python's asyncio event loop.

The AsyncOpenAI Client

The async client mirrors the synchronous API exactly, but every method is a coroutine:

flowchart LR
    REQ(["Request"])
    BATCH["Continuous batching<br/>vLLM scheduler"]
    PREF{"Prefill or<br/>decode?"}
    PRE["Prefill phase<br/>parallel attention"]
    DEC["Decode phase<br/>token by token"]
    KV[("Paged KV cache")]
    SAMP["Sampling<br/>top-p, temp"]
    STREAM["Stream tokens<br/>to client"]
    REQ --> BATCH --> PREF
    PREF -->|First token| PRE --> KV
    PREF -->|Next token| DEC
    KV --> DEC --> SAMP --> STREAM
    SAMP -->|EOS| DONE(["Response complete"])
    style BATCH fill:#4f46e5,stroke:#4338ca,color:#fff
    style KV fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style STREAM fill:#0ea5e9,stroke:#0369a1,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello, async world!"}],
    )
    print(response.choices[0].message.content)

asyncio.run(main())

The AsyncOpenAI client uses httpx.AsyncClient under the hood, which provides connection pooling and HTTP/2 support automatically.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Concurrent Requests with asyncio.gather

The biggest win comes from running multiple requests at the same time:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def summarize(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Summarize the following text in one sentence."},
            {"role": "user", "content": text},
        ],
    )
    return response.choices[0].message.content

async def main():
    articles = [
        "Python 3.13 introduces a new JIT compiler that improves performance...",
        "The European Union's AI Act requires transparency for high-risk systems...",
        "SpaceX successfully launched its 300th Falcon 9 mission this quarter...",
        "OpenAI released GPT-4o with native multimodal capabilities...",
        "Rust adoption in enterprise backends grew by 40% in 2025...",
    ]

    # Run all 5 summaries concurrently
    summaries = await asyncio.gather(*[summarize(article) for article in articles])

    for article, summary in zip(articles, summaries):
        print(f"Original: {article[:50]}...")
        print(f"Summary: {summary}")
        print()

asyncio.run(main())

With synchronous code, this takes 5x the time of a single request. With asyncio.gather, all five requests run concurrently and the total time is roughly equal to the slowest single request.

Controlling Concurrency with Semaphores

Firing 1000 concurrent requests will hit rate limits. Use a semaphore to cap concurrency:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()
semaphore = asyncio.Semaphore(10)  # max 10 concurrent requests

async def process_item(item: str) -> str:
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"Classify this feedback: {item}"}],
        )
        return response.choices[0].message.content

async def main():
    feedback_items = [f"Feedback item {i}" for i in range(100)]

    tasks = [process_item(item) for item in feedback_items]
    results = await asyncio.gather(*tasks)

    print(f"Processed {len(results)} items")

asyncio.run(main())

The semaphore ensures no more than 10 requests are in-flight at any moment, preventing rate limit errors while still processing items much faster than sequential code.

Async Streaming

Combine async with streaming for the best real-time experience:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def stream_chat(prompt: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            print(delta.content, end="", flush=True)
    print()

asyncio.run(stream_chat("Explain event loops in Python."))

Processing Results as They Complete

When tasks have variable completion times, asyncio.as_completed lets you handle results as they arrive:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def analyze(text: str, index: int) -> tuple[int, str]:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Extract the sentiment: positive, negative, or neutral."},
            {"role": "user", "content": text},
        ],
    )
    return index, response.choices[0].message.content

async def main():
    texts = [
        "This product is amazing! Best purchase ever.",
        "Terrible experience. Will never buy again.",
        "It works fine. Nothing special.",
    ]

    tasks = [analyze(text, i) for i, text in enumerate(texts)]

    for coro in asyncio.as_completed(tasks):
        index, sentiment = await coro
        print(f"Item {index}: {sentiment}")

asyncio.run(main())

Integration with FastAPI

FastAPI is natively async, making it a natural fit:

from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/analyze")
async def analyze_text(text: str):
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Analyze the sentiment of this text."},
            {"role": "user", "content": text},
        ],
    )
    return {"sentiment": response.choices[0].message.content}

FAQ

Should I create one AsyncOpenAI client or one per request?

Create one client and reuse it across all requests. The client manages an internal connection pool. Creating a new client per request wastes connections and adds overhead.

Can I mix sync and async OpenAI calls in the same application?

Yes, but keep them separate. Use OpenAI() for synchronous code and AsyncOpenAI() for async code. Do not call synchronous methods from within an async function — it blocks the event loop.

What is the ideal concurrency level for OpenAI API calls?

It depends on your rate limits. Check your plan's requests-per-minute (RPM) limit. A good starting point is a semaphore value of RPM divided by 6 (to account for variable request duration). Monitor 429 errors and adjust.


#OpenAI #AsyncPython #AsyncIO #Concurrency #Performance #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like