Skip to content
Learn Agentic AI
Learn Agentic AI11 min read9 views

Parallel Agent Execution with asyncio.gather

Learn how to run multiple OpenAI agents concurrently using asyncio.gather for dramatic performance improvements, with error handling strategies and a complete market research example.

Why Parallel Execution Matters

When you run agents sequentially, total execution time is the sum of all agent runtimes. If Agent A takes 3 seconds, Agent B takes 4 seconds, and Agent C takes 2 seconds, you wait 9 seconds.

With parallel execution, total time equals the slowest agent. Those same three agents running concurrently finish in 4 seconds — a 56% reduction.

The OpenAI Agents SDK is built on async Python, making it a natural fit for parallel execution via asyncio.gather. This post covers the patterns, pitfalls, and production considerations for running agents in parallel.

Basic Parallel Execution

The simplest pattern runs multiple agents on the same input concurrently:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    INPUT(["User input"])
    AGENT["Agent<br/>name plus instructions"]
    HAND{"Handoff to<br/>another agent?"}
    SUB["Sub-agent<br/>specialist"]
    GUARD{"Guardrail<br/>passed?"}
    TOOL["Tool call"]
    SDK[("Tracing<br/>OpenAI dashboard")]
    OUT(["Final output"])
    INPUT --> AGENT --> HAND
    HAND -->|Yes| SUB --> GUARD
    HAND -->|No| GUARD
    GUARD -->|Yes| TOOL --> AGENT
    GUARD -->|Block| OUT
    AGENT --> OUT
    AGENT --> SDK
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
from agents import Agent, Runner
import asyncio

sentiment_agent = Agent(
    name="SentimentAnalyzer",
    instructions="Analyze the sentiment of the given text. Return: positive, negative, or neutral with a confidence score 0-100.",
    model="gpt-4o-mini",
)

topic_agent = Agent(
    name="TopicExtractor",
    instructions="Extract the main topics from the given text. Return a JSON list of topics.",
    model="gpt-4o-mini",
)

summary_agent = Agent(
    name="Summarizer",
    instructions="Summarize the given text in exactly one sentence.",
    model="gpt-4o-mini",
)

async def analyze_text(text: str):
    # Run all three agents in parallel
    sentiment_result, topic_result, summary_result = await asyncio.gather(
        Runner.run(sentiment_agent, input=text),
        Runner.run(topic_agent, input=text),
        Runner.run(summary_agent, input=text),
    )

    return {
        "sentiment": sentiment_result.final_output,
        "topics": topic_result.final_output,
        "summary": summary_result.final_output,
    }

async def main():
    text = """
    The new AI regulations proposed by the European Commission have sparked
    intense debate among technology leaders. While some argue the rules will
    stifle innovation, others believe they provide necessary consumer protections.
    The legislation is expected to be finalized by Q3 2026.
    """
    results = await analyze_text(text)
    for key, value in results.items():
        print(f"{key}: {value}\n")

asyncio.run(main())

Parallel Execution with Different Inputs

A more common pattern is running the same agent on different inputs, or different agents on different inputs:

from agents import Agent, Runner
import asyncio

researcher = Agent(
    name="Researcher",
    instructions="""Research the given company and provide:
    - Industry and market position
    - Key products/services
    - Recent developments
    - Competitive advantages""",
    model="gpt-4o",
)

async def research_companies(companies: list[str]) -> dict:
    """Research multiple companies in parallel."""
    tasks = [
        Runner.run(researcher, input=f"Research this company: {company}")
        for company in companies
    ]

    results = await asyncio.gather(*tasks)

    return {
        company: result.final_output
        for company, result in zip(companies, results)
    }

async def main():
    companies = ["Stripe", "Datadog", "Cloudflare", "Vercel"]
    reports = await research_companies(companies)
    for company, report in reports.items():
        print(f"\n{'=' * 40}")
        print(f"Company: {company}")
        print(report)

asyncio.run(main())

Error Handling in Parallel Execution

The critical question with asyncio.gather is: what happens when one agent fails? By default, if any task raises an exception, gather cancels all remaining tasks and raises the first exception. This is often not what you want.

return_exceptions=True

The simplest error handling strategy uses the return_exceptions parameter:

from agents import Agent, Runner
import asyncio

agent_a = Agent(name="AgentA", instructions="Analyze market trends.", model="gpt-4o")
agent_b = Agent(name="AgentB", instructions="Analyze competitor positioning.", model="gpt-4o")
agent_c = Agent(name="AgentC", instructions="Analyze customer sentiment.", model="gpt-4o")

async def parallel_analysis(input_text: str) -> dict:
    results = await asyncio.gather(
        Runner.run(agent_a, input=input_text),
        Runner.run(agent_b, input=input_text),
        Runner.run(agent_c, input=input_text),
        return_exceptions=True,  # Don't cancel other tasks on failure
    )

    analysis = {}
    agents = ["market_trends", "competitor", "sentiment"]

    for name, result in zip(agents, results):
        if isinstance(result, Exception):
            analysis[name] = f"ERROR: {type(result).__name__}: {str(result)}"
        else:
            analysis[name] = result.final_output

    return analysis

async def main():
    analysis = await parallel_analysis("Analyze the AI agent framework market")
    for section, content in analysis.items():
        print(f"\n{section}:")
        print(content)

asyncio.run(main())

Retry Logic for Failed Agents

For production systems, add retry logic around individual agent calls:

from agents import Agent, Runner
import asyncio

async def run_with_retry(
    agent: Agent,
    input_text: str,
    max_retries: int = 3,
    delay: float = 1.0,
) -> str:
    """Run an agent with retry logic."""
    last_error = None
    for attempt in range(max_retries):
        try:
            result = await Runner.run(agent, input=input_text)
            return result.final_output
        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                await asyncio.sleep(delay * (2 ** attempt))  # Exponential backoff
    return f"FAILED after {max_retries} attempts: {str(last_error)}"

async def parallel_with_retries(agents: list[Agent], input_text: str) -> list[str]:
    """Run multiple agents in parallel, each with retry logic."""
    tasks = [
        run_with_retry(agent, input_text)
        for agent in agents
    ]
    return await asyncio.gather(*tasks)

Timeout per Agent

Prevent a slow agent from holding up the entire pipeline:

from agents import Agent, Runner
import asyncio

async def run_with_timeout(
    agent: Agent,
    input_text: str,
    timeout_seconds: float = 30.0,
) -> str:
    """Run an agent with a timeout."""
    try:
        result = await asyncio.wait_for(
            Runner.run(agent, input=input_text),
            timeout=timeout_seconds,
        )
        return result.final_output
    except asyncio.TimeoutError:
        return f"TIMEOUT: {agent.name} did not complete within {timeout_seconds}s"

async def parallel_with_timeouts(
    agents: list[Agent],
    input_text: str,
    timeout: float = 30.0,
) -> list[str]:
    """Run multiple agents in parallel with individual timeouts."""
    tasks = [
        run_with_timeout(agent, input_text, timeout)
        for agent in agents
    ]
    return await asyncio.gather(*tasks)

Combining Results from Parallel Agents

After running agents in parallel, you often need a synthesis step. Use a dedicated synthesis agent:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

from agents import Agent, Runner
import asyncio

# Parallel analysis agents
market_agent = Agent(
    name="MarketAnalyst",
    instructions="Analyze market size, growth rate, and trends for the given industry.",
    model="gpt-4o",
)

competitor_agent = Agent(
    name="CompetitorAnalyst",
    instructions="Identify top 5 competitors, their market share, and key differentiators.",
    model="gpt-4o",
)

customer_agent = Agent(
    name="CustomerAnalyst",
    instructions="Analyze target customer segments, pain points, and buying patterns.",
    model="gpt-4o",
)

# Synthesis agent
synthesizer = Agent(
    name="ReportSynthesizer",
    instructions="""You receive three separate analysis reports: market analysis,
    competitor analysis, and customer analysis. Synthesize them into a single
    coherent executive report with these sections:
    1. Executive Summary
    2. Market Opportunity
    3. Competitive Landscape
    4. Target Customer Profile
    5. Strategic Recommendations

    Be concise but data-driven. Reference specific findings from each report.""",
    model="gpt-4o",
)

async def generate_market_report(industry: str) -> str:
    """Generate a comprehensive market report using parallel agents."""

    # Phase 1: Run analysis agents in parallel
    market_result, competitor_result, customer_result = await asyncio.gather(
        Runner.run(market_agent, input=f"Analyze the {industry} industry"),
        Runner.run(competitor_agent, input=f"Analyze competitors in {industry}"),
        Runner.run(customer_agent, input=f"Analyze customers in {industry}"),
    )

    # Phase 2: Synthesize results
    combined_input = f"""
    MARKET ANALYSIS:
    {market_result.final_output}

    COMPETITOR ANALYSIS:
    {competitor_result.final_output}

    CUSTOMER ANALYSIS:
    {customer_result.final_output}
    """

    synthesis = await Runner.run(synthesizer, input=combined_input)
    return synthesis.final_output

async def main():
    report = await generate_market_report("AI-powered customer service platforms")
    print(report)

asyncio.run(main())

Performance Benchmarking

Here is a utility to measure the performance difference between sequential and parallel execution:

from agents import Agent, Runner
import asyncio
import time

async def benchmark_sequential(agents: list[Agent], input_text: str) -> float:
    """Run agents sequentially and return total time."""
    start = time.monotonic()
    for agent in agents:
        await Runner.run(agent, input=input_text)
    elapsed = time.monotonic() - start
    return elapsed

async def benchmark_parallel(agents: list[Agent], input_text: str) -> float:
    """Run agents in parallel and return total time."""
    start = time.monotonic()
    await asyncio.gather(*[
        Runner.run(agent, input=input_text)
        for agent in agents
    ])
    elapsed = time.monotonic() - start
    return elapsed

async def main():
    agents = [
        Agent(name=f"Agent{i}", instructions=f"Analyze aspect {i} of the input.", model="gpt-4o-mini")
        for i in range(5)
    ]
    input_text = "Analyze the AI agent framework market"

    seq_time = await benchmark_sequential(agents, input_text)
    par_time = await benchmark_parallel(agents, input_text)

    print(f"Sequential: {seq_time:.2f}s")
    print(f"Parallel:   {par_time:.2f}s")
    print(f"Speedup:    {seq_time / par_time:.1f}x")

asyncio.run(main())

Typical results with 5 agents: sequential takes 12-15 seconds, parallel takes 3-4 seconds, yielding a 3-4x speedup.

Building a Complete Market Research System

Here is a full market research system that demonstrates all parallel execution patterns:

from agents import Agent, Runner
from pydantic import BaseModel
import asyncio
import json

# ─── Structured Output Models ───

class MarketData(BaseModel):
    market_size_usd: str
    growth_rate: str
    key_trends: list[str]
    risks: list[str]

class CompetitorProfile(BaseModel):
    name: str
    market_share: str
    strengths: list[str]
    weaknesses: list[str]

class CompetitorReport(BaseModel):
    competitors: list[CompetitorProfile]

class CustomerSegment(BaseModel):
    name: str
    size: str
    pain_points: list[str]
    willingness_to_pay: str

class CustomerReport(BaseModel):
    segments: list[CustomerSegment]

# ─── Specialized Agents with Structured Output ───

market_agent = Agent(
    name="MarketResearcher",
    instructions="Provide detailed market analysis with specific numbers and data points.",
    model="gpt-4o",
    output_type=MarketData,
)

competitor_agent = Agent(
    name="CompetitorResearcher",
    instructions="Profile the top 3-5 competitors with specific market share estimates.",
    model="gpt-4o",
    output_type=CompetitorReport,
)

customer_agent = Agent(
    name="CustomerResearcher",
    instructions="Identify 3-4 distinct customer segments with specific characteristics.",
    model="gpt-4o",
    output_type=CustomerReport,
)

# ─── Synthesis Agent ───

synthesis_agent = Agent(
    name="ReportWriter",
    instructions="""Write an executive market research report from the provided data.
    Structure it as: Executive Summary, Market Overview, Competitive Landscape,
    Customer Segments, and Strategic Recommendations. Use specific data points.""",
    model="gpt-4o",
)

# ─── Orchestration ───

async def run_with_timeout_and_retry(
    agent: Agent,
    input_text: str,
    timeout: float = 45.0,
    retries: int = 2,
):
    """Run agent with timeout and retry logic."""
    for attempt in range(retries):
        try:
            result = await asyncio.wait_for(
                Runner.run(agent, input=input_text),
                timeout=timeout,
            )
            return result
        except asyncio.TimeoutError:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(1)
        except Exception:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)

async def generate_research_report(topic: str) -> str:
    """Generate a full market research report using parallel agents."""

    print(f"Starting parallel research on: {topic}")

    # Phase 1: Parallel data gathering
    results = await asyncio.gather(
        run_with_timeout_and_retry(market_agent, f"Market analysis: {topic}"),
        run_with_timeout_and_retry(competitor_agent, f"Competitor analysis: {topic}"),
        run_with_timeout_and_retry(customer_agent, f"Customer analysis: {topic}"),
        return_exceptions=True,
    )

    # Phase 2: Collect results, handling any failures
    sections = []
    labels = ["MARKET DATA", "COMPETITOR DATA", "CUSTOMER DATA"]

    for label, result in zip(labels, results):
        if isinstance(result, Exception):
            sections.append(f"{label}: Data unavailable due to error: {str(result)}")
        else:
            output = result.final_output
            if hasattr(output, 'model_dump'):
                sections.append(f"{label}:\n{json.dumps(output.model_dump(), indent=2)}")
            else:
                sections.append(f"{label}:\n{output}")

    combined = "\n\n".join(sections)

    # Phase 3: Synthesize into final report
    report_result = await Runner.run(
        synthesis_agent,
        input=f"Write a market research report from this data:\n\n{combined}",
    )

    return report_result.final_output

async def main():
    report = await generate_research_report(
        "AI-powered voice agents for customer service in 2026"
    )
    print("\n" + "=" * 60)
    print("FINAL REPORT")
    print("=" * 60)
    print(report)

asyncio.run(main())

When NOT to Parallelize

Parallel execution is not always the right choice:

  • When agents depend on each other's output: If Agent B needs Agent A's result, they must run sequentially
  • When you are rate-limited: Running 10 agents in parallel might hit API rate limits. Use asyncio.Semaphore to limit concurrency
  • When context is shared and mutable: If agents modify the same context object, parallel execution creates race conditions

Using Semaphore for Rate Limiting

import asyncio
from agents import Agent, Runner

# Limit to 3 concurrent agent runs
semaphore = asyncio.Semaphore(3)

async def run_with_semaphore(agent: Agent, input_text: str):
    async with semaphore:
        return await Runner.run(agent, input=input_text)

async def main():
    agents = [
        Agent(name=f"Agent{i}", instructions=f"Task {i}", model="gpt-4o-mini")
        for i in range(10)
    ]

    # Only 3 will run at a time despite 10 being queued
    results = await asyncio.gather(*[
        run_with_semaphore(agent, "Analyze this market")
        for agent in agents
    ])

Summary

Parallel agent execution with asyncio.gather is one of the highest-impact performance optimizations for multi-agent systems. Use it whenever you have independent tasks that can run concurrently. Add return_exceptions=True to prevent one failure from canceling everything. Add timeouts to prevent slow agents from blocking the pipeline. Add retries for resilience. And use a synthesis agent to combine parallel results into coherent output.

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like