Skip to content
Learn Agentic AI
Learn Agentic AI12 min read9 views

Multiprocessing vs Asyncio for AI Workloads: When to Use Each Approach

Understand when to use multiprocessing versus asyncio for AI agent workloads. Learn CPU-bound vs I/O-bound trade-offs, ProcessPoolExecutor, and hybrid patterns.

The Fundamental Decision

Python's GIL (Global Interpreter Lock) means that only one thread executes Python bytecode at a time within a single process. This creates a clear decision tree for AI workloads:

  • I/O-bound work (LLM API calls, database queries, file reads) — use asyncio. The GIL is released during I/O operations, so asyncio's single-threaded event loop efficiently multiplexes thousands of concurrent I/O operations.
  • CPU-bound work (embedding computation, text preprocessing, local model inference) — use multiprocessing. Each process has its own GIL, so CPU work truly runs in parallel across cores.

Most AI agent systems involve both. The key is choosing the right tool for each part of the pipeline.

I/O-Bound: asyncio Dominates

API calls to LLM providers are pure I/O. The agent sends a request and waits for the response. asyncio handles this efficiently because the event loop switches to other tasks during the wait.

flowchart TD
    Q{"What matters most<br/>for your team?"}
    DIM1["Time to first<br/>production deploy"]
    DIM2["Total cost of<br/>ownership at scale"]
    DIM3["Debuggability and<br/>observability"]
    DIM4["Ecosystem and<br/>community support"]
    PICK{Score the<br/>four axes}
    A(["Pick<br/>Multiprocessing"])
    B(["Pick<br/>Asyncio for AI<br/>Workloads"])
    Q --> DIM1 --> PICK
    Q --> DIM2 --> PICK
    Q --> DIM3 --> PICK
    Q --> DIM4 --> PICK
    PICK -->|Speed and ecosystem| A
    PICK -->|Control and TCO| B
    style Q fill:#4f46e5,stroke:#4338ca,color:#fff
    style PICK fill:#f59e0b,stroke:#d97706,color:#1f2937
    style A fill:#0ea5e9,stroke:#0369a1,color:#fff
    style B fill:#059669,stroke:#047857,color:#fff
import asyncio
import httpx
import time

async def benchmark_io_bound():
    """Benchmark concurrent LLM API calls with asyncio."""
    prompts = [f"Question {i}: Explain concept {i}" for i in range(20)]

    async with httpx.AsyncClient(timeout=30.0) as client:
        start = time.monotonic()
        tasks = [
            simulate_llm_call(client, prompt)
            for prompt in prompts
        ]
        results = await asyncio.gather(*tasks)
        elapsed = time.monotonic() - start

    print(f"20 I/O-bound calls: {elapsed:.2f}s with asyncio")
    # ~2s (limited by slowest call, not sum of all calls)

async def simulate_llm_call(client, prompt):
    await asyncio.sleep(1.5)  # Simulate API latency
    return f"Response to {prompt}"

asyncio.run(benchmark_io_bound())

CPU-Bound: Multiprocessing Is Required

Embedding generation, text chunking, and local model inference are CPU-intensive. asyncio provides zero speedup for CPU-bound work because the GIL prevents parallel execution within a single process.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import time

def compute_embeddings_batch(texts: list[str]) -> list[list[float]]:
    """CPU-intensive embedding computation (runs in worker process)."""
    # Simulating CPU-heavy work
    embeddings = []
    for text in texts:
        # In reality, this would be a local model inference
        embedding = [hash(text + str(i)) % 1000 / 1000.0
                     for i in range(384)]
        embeddings.append(embedding)
    return embeddings

def benchmark_cpu_bound():
    """Benchmark CPU-bound work with multiprocessing."""
    all_texts = [f"Document {i} content..." for i in range(1000)]
    chunk_size = 100
    chunks = [
        all_texts[i:i + chunk_size]
        for i in range(0, len(all_texts), chunk_size)
    ]

    # Sequential
    start = time.monotonic()
    for chunk in chunks:
        compute_embeddings_batch(chunk)
    seq_time = time.monotonic() - start

    # Parallel with multiprocessing
    start = time.monotonic()
    with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
        results = list(executor.map(compute_embeddings_batch, chunks))
    par_time = time.monotonic() - start

    print(f"Sequential: {seq_time:.2f}s")
    print(f"Parallel ({mp.cpu_count()} workers): {par_time:.2f}s")
    print(f"Speedup: {seq_time / par_time:.1f}x")

benchmark_cpu_bound()

The Hybrid Pattern: asyncio + ProcessPoolExecutor

Real AI agents combine I/O-bound and CPU-bound work. The hybrid pattern uses asyncio for the main event loop and offloads CPU-heavy work to a process pool.

import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial

# Module-level process pool (shared across requests)
_process_pool = ProcessPoolExecutor(max_workers=4)

def cpu_heavy_preprocess(text: str) -> dict:
    """CPU-bound text preprocessing (runs in separate process)."""
    # Tokenization, NER, chunking — CPU intensive
    tokens = text.split()
    chunks = [
        " ".join(tokens[i:i+256])
        for i in range(0, len(tokens), 256)
    ]
    return {"chunks": chunks, "token_count": len(tokens)}

async def agent_pipeline(document: str) -> dict:
    """Agent pipeline mixing I/O and CPU work."""
    loop = asyncio.get_running_loop()

    # Step 1: CPU-bound preprocessing (offload to process pool)
    preprocessed = await loop.run_in_executor(
        _process_pool,
        cpu_heavy_preprocess,
        document,
    )

    # Step 2: I/O-bound LLM calls (run concurrently with asyncio)
    async with httpx.AsyncClient(timeout=60.0) as client:
        summaries = await asyncio.gather(*[
            call_llm(client, f"Summarize: {chunk}")
            for chunk in preprocessed["chunks"]
        ])

    # Step 3: CPU-bound post-processing
    final = await loop.run_in_executor(
        _process_pool,
        merge_summaries,
        summaries,
    )
    return final

The key method is loop.run_in_executor(). It runs a synchronous function in a thread pool or process pool without blocking the event loop.

When to Use asyncio.to_thread

For lighter CPU work or blocking library calls, asyncio.to_thread() offloads to a thread instead of a process. This avoids the serialization overhead of multiprocessing but is limited by the GIL.

import asyncio

async def process_with_blocking_library(data: str) -> dict:
    """Use asyncio.to_thread for blocking library calls."""
    # This runs in a thread — GIL limits parallelism but
    # it does not block the event loop
    result = await asyncio.to_thread(
        blocking_library_call, data
    )
    return result

Use to_thread for: blocking file I/O, synchronous database drivers, third-party libraries without async support. Use run_in_executor with a process pool for: heavy computation, numpy operations, local model inference.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Decision Matrix

Workload Type       | Best Tool              | Example
--------------------+------------------------+-----------------------------
LLM API calls       | asyncio                | OpenAI, Anthropic API calls
Database queries    | asyncio (async driver)  | asyncpg, motor
File I/O            | asyncio.to_thread      | Reading large documents
Text preprocessing  | ProcessPoolExecutor    | Tokenization, chunking
Local model infer.  | ProcessPoolExecutor    | sentence-transformers
Embedding compute   | ProcessPoolExecutor    | numpy-heavy operations
Mixed pipeline      | Hybrid (asyncio + PPE) | Full agent workflow

FAQ

Does the GIL affect LLM API calls?

No. The GIL is released during I/O operations (network calls, file reads, etc.). When your code is waiting for an API response from OpenAI, the GIL is free and other Python threads or asyncio tasks can run. The GIL only matters for CPU-bound Python bytecode execution.

What is the overhead of ProcessPoolExecutor?

Each task submission serializes the function arguments with pickle, sends them to a worker process, and deserializes the results back. For small inputs this adds 1-5ms overhead. For large data (megabytes of text), serialization can take 10-100ms. Batch your work to amortize this cost — send 100 documents per process call, not one.

Can I use multiprocessing.Pool inside an asyncio event loop?

Not directly. multiprocessing.Pool's methods are blocking and will freeze your event loop. Always use loop.run_in_executor(ProcessPoolExecutor(...)) to integrate multiprocessing with asyncio. The executor handles the inter-process communication without blocking the event loop.


#Python #Multiprocessing #Asyncio #Performance #AIAgents #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Multi-Agent Handoffs with the OpenAI Agents SDK: The Pattern That Actually Scales (2026)

Handoffs done right — when one agent should hand control to another, how to preserve context, and how to evaluate the handoff decision itself.

AI Strategy

AI Agent M&A Activity 2026: Aircall–Vogent, Meta–PlayAI, OpenAI's Six Deals

Q1 2026 saw a record acquisition wave: Aircall bought Vogent (May), Meta acquired Manus and PlayAI, OpenAI closed six deals. The voice AI consolidation phase has begun.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

LangGraph Checkpointers in Production: Durable, Resumable Agents with Eval Replay

Use LangGraph's checkpointer to make agents resumable across crashes and human-in-the-loop pauses, then replay any checkpoint into your eval pipeline.

Agentic AI

LangGraph State-Machine Architecture: A Principal-Engineer Deep Dive (2026)

How LangGraph's StateGraph, channels, and reducers actually work — with a working multi-step agent, eval hooks at every node, and the patterns that survive production.

Agentic AI

LangGraph Supervisor Pattern: Orchestrating Multi-Agent Teams in 2026

The supervisor pattern in LangGraph for coordinating specialist agents, with full code, an eval pipeline that scores routing accuracy, and the failure modes to watch for.