Parallel Fan-Out Fan-In Patterns: Processing Multiple Sub-Tasks Simultaneously
Implement fan-out fan-in patterns for AI agents to distribute work across parallel sub-tasks, aggregate results, handle partial failures gracefully, and enforce timeouts on straggler tasks.
The Fan-Out Fan-In Pattern
Many agent tasks naturally decompose into independent sub-tasks. A research agent might need to search five databases simultaneously. A code review agent might analyze ten files in parallel. A customer support agent might check order status, payment history, and shipping details all at once.
The fan-out fan-in pattern distributes work across multiple concurrent sub-tasks (fan-out) and then collects and merges the results (fan-in). This pattern dramatically reduces total execution time — instead of running N tasks sequentially in N * T seconds, you run them in parallel in roughly T seconds.
Basic Fan-Out with asyncio.gather
The simplest implementation uses asyncio.gather:
flowchart LR
INPUT(["User intent"])
PARSE["Parse plus<br/>classify"]
PLAN["Plan and tool<br/>selection"]
AGENT["Agent loop<br/>LLM plus tools"]
GUARD{"Guardrails<br/>and policy"}
EXEC["Execute and<br/>verify result"]
OBS[("Trace and metrics")]
OUT(["Outcome plus<br/>next action"])
INPUT --> PARSE --> PLAN --> AGENT --> GUARD
GUARD -->|Pass| EXEC --> OUT
GUARD -->|Fail| AGENT
AGENT --> OBS
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
from typing import Any, Callable, Awaitable
async def fan_out_gather(
tasks: list[Callable[[], Awaitable[Any]]],
) -> list[Any]:
"""Run all tasks in parallel and collect results."""
return await asyncio.gather(*[task() for task in tasks])
# Example: search multiple sources in parallel
async def search_arxiv(query: str) -> dict:
await asyncio.sleep(1) # Simulate API call
return {"source": "arxiv", "results": ["paper1", "paper2"]}
async def search_scholar(query: str) -> dict:
await asyncio.sleep(1.5)
return {"source": "scholar", "results": ["paper3"]}
async def search_semantic(query: str) -> dict:
await asyncio.sleep(0.8)
return {"source": "semantic_scholar", "results": ["paper4", "paper5"]}
query = "agentic AI workflows"
results = await fan_out_gather([
lambda: search_arxiv(query),
lambda: search_scholar(query),
lambda: search_semantic(query),
])
# All three searches complete in ~1.5s (the slowest) instead of ~3.3s
The problem with plain gather is that one failed task raises an exception and cancels everything. Production systems need better error handling.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
Robust Fan-Out with Partial Failure Handling
Use asyncio.gather(return_exceptions=True) or a custom wrapper to handle individual task failures without aborting the entire batch:
from dataclasses import dataclass
from typing import TypeVar, Generic
T = TypeVar("T")
@dataclass
class TaskResult(Generic[T]):
task_id: str
success: bool
result: T | None = None
error: str | None = None
duration_ms: float = 0.0
async def robust_fan_out(
tasks: dict[str, Callable[[], Awaitable[Any]]],
timeout: float | None = None,
) -> dict[str, TaskResult]:
"""Fan-out with per-task error handling and optional timeout."""
import time
async def wrapped(task_id: str, fn: Callable) -> TaskResult:
start = time.monotonic()
try:
result = await fn()
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, True, result, duration_ms=elapsed)
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)
coros = [wrapped(tid, fn) for tid, fn in tasks.items()]
if timeout:
done, pending = await asyncio.wait(
[asyncio.create_task(c) for c in coros],
timeout=timeout,
)
# Cancel timed-out tasks
for task in pending:
task.cancel()
results = {}
for task in done:
r = task.result()
results[r.task_id] = r
for task in pending:
# Mark timed-out tasks
results[f"timeout_{id(task)}"] = TaskResult(
"unknown", False, error="Task timed out"
)
return results
else:
raw_results = await asyncio.gather(*coros)
return {r.task_id: r for r in raw_results}
Now individual task failures are captured in the TaskResult without crashing the entire operation.
The Fan-In Aggregator
After fan-out completes, the fan-in stage merges partial results into a coherent output:
class ResultAggregator:
"""Merge results from parallel sub-tasks."""
def __init__(self, min_success_ratio: float = 0.5):
self.min_success_ratio = min_success_ratio
def aggregate(
self, results: dict[str, TaskResult]
) -> dict[str, Any]:
successful = {
tid: r for tid, r in results.items() if r.success
}
failed = {
tid: r for tid, r in results.items() if not r.success
}
total = len(results)
success_count = len(successful)
success_ratio = success_count / total if total else 0
if success_ratio < self.min_success_ratio:
raise InsufficientResultsError(
f"Only {success_count}/{total} tasks succeeded "
f"({success_ratio:.0%}), minimum is {self.min_success_ratio:.0%}"
)
return {
"merged_results": [r.result for r in successful.values()],
"success_count": success_count,
"failure_count": len(failed),
"failed_tasks": {
tid: r.error for tid, r in failed.items()
},
"total_duration_ms": max(
r.duration_ms for r in results.values()
),
}
The min_success_ratio parameter controls how many tasks must succeed before the aggregated result is considered valid. For a research agent querying five sources, maybe three out of five is acceptable. For a financial reconciliation, you might need all tasks to succeed.
Bounded Concurrency with Semaphores
Unbounded fan-out can overwhelm downstream services. Use a semaphore to limit concurrency:
async def bounded_fan_out(
tasks: dict[str, Callable[[], Awaitable[Any]]],
max_concurrency: int = 5,
) -> dict[str, TaskResult]:
"""Fan-out with bounded concurrency."""
semaphore = asyncio.Semaphore(max_concurrency)
import time
async def limited(task_id: str, fn: Callable) -> TaskResult:
async with semaphore:
start = time.monotonic()
try:
result = await fn()
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, True, result, duration_ms=elapsed)
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)
coros = [limited(tid, fn) for tid, fn in tasks.items()]
raw_results = await asyncio.gather(*coros)
return {r.task_id: r for r in raw_results}
With max_concurrency=5, at most five tasks run simultaneously even if you fan out to fifty sub-tasks.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Complete Agent Example: Multi-Source Research
Putting the pattern together for a research agent that queries multiple sources, handles failures, and synthesizes results:
class ResearchAgent:
def __init__(self, llm_client, sources: dict):
self.llm = llm_client
self.sources = sources
self.aggregator = ResultAggregator(min_success_ratio=0.4)
async def research(self, query: str) -> str:
# Fan-out: query all sources in parallel
tasks = {
name: lambda n=name: self._search_source(n, query)
for name in self.sources
}
results = await bounded_fan_out(tasks, max_concurrency=3)
# Fan-in: aggregate results
aggregated = self.aggregator.aggregate(results)
# Synthesize with LLM
return await self._synthesize(query, aggregated)
async def _search_source(self, source_name: str, query: str) -> list:
search_fn = self.sources[source_name]
return await search_fn(query)
async def _synthesize(self, query: str, aggregated: dict) -> str:
all_results = []
for source_results in aggregated["merged_results"]:
all_results.extend(source_results)
prompt = f"""Synthesize these research results for the query: {query}
Results from {aggregated['success_count']} sources:
{json.dumps(all_results, indent=2)}
Note: {aggregated['failure_count']} sources failed and are excluded.
Provide a comprehensive summary."""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
FAQ
How do I decide the right max_concurrency value?
Start with the most restrictive downstream limit. If you are calling an API with a rate limit of 10 requests per second, set max_concurrency to 10 or lower. If you are calling multiple different APIs, each with its own limit, use separate semaphores per API. For LLM APIs, check your tier's rate limit (requests per minute) and set concurrency accordingly. Monitor for 429 (rate limit) errors and adjust down if they appear.
Should I use asyncio.gather, asyncio.TaskGroup, or asyncio.wait?
Use gather with return_exceptions=True for the simplest case where you want all results including errors. Use TaskGroup (Python 3.11 and later) when you want structured concurrency with automatic cleanup — if one task fails, all others are cancelled. Use wait when you need fine-grained control over timeouts or want to process results as they complete rather than waiting for all tasks.
What happens if the slowest sub-task takes much longer than the others?
This is the "straggler" problem. Set a timeout on the entire fan-out operation. When the timeout fires, cancel the straggler and proceed with the results you have. The aggregator checks whether enough tasks succeeded to produce a meaningful result. For research tasks, getting four out of five sources in two seconds is often better than waiting 30 seconds for the fifth source.
#FanOutFanIn #ParallelProcessing #Concurrency #Asyncio #Python #AgenticAI #LearnAI #AIEngineering
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.