Skip to content
Learn Agentic AI
Learn Agentic AI13 min read29 views

Playwright with Async Python: Concurrent Browser Automation for AI Agents

Learn how to use Playwright's async API with Python asyncio to run concurrent browser sessions, parallelize page interactions, and build high-throughput AI agent automation pipelines.

Why Async Matters for Browser Automation

Browser automation is inherently I/O-bound — most of the time is spent waiting for pages to load, elements to appear, and network requests to complete. Synchronous Playwright wastes this idle time by blocking the Python thread. Async Playwright, using Python's asyncio, lets your AI agent do useful work while waiting: processing data from a previous page, launching another browser tab, or calling an LLM API.

For agents that need to scrape multiple sites, interact with multiple accounts, or run parallel browser sessions, async Playwright can deliver 5-10x throughput improvements over synchronous code.

Async Playwright Basics

The async API mirrors the sync API exactly, but every method that performs I/O becomes a coroutine:

flowchart LR
    GOAL(["High level goal"])
    PLAN["Planner LLM"]
    SCREEN["Screen capture<br/>every step"]
    VLM["Vision LLM<br/>reads UI state"]
    ACT{"Action type"}
    CLICK["Click coordinate"]
    TYPE["Type text"]
    KEY["Keyboard shortcut"]
    GUARD["Safety filter<br/>allow lists"]
    OS[("OS sandbox<br/>ephemeral VM")]
    DONE(["Goal verified"])
    GOAL --> PLAN --> SCREEN --> VLM --> ACT
    ACT --> CLICK --> GUARD
    ACT --> TYPE --> GUARD
    ACT --> KEY --> GUARD
    GUARD --> OS --> SCREEN
    OS --> DONE
    style PLAN fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto("https://example.com")

        title = await page.title()
        print(f"Title: {title}")

        content = await page.locator("h1").text_content()
        print(f"Heading: {content}")

        await browser.close()

asyncio.run(main())

Notice the pattern: sync_playwright() becomes async_playwright(), and every Playwright method gets an await prefix. The import changes from playwright.sync_api to playwright.async_api.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Running Multiple Pages Concurrently

The real power of async Playwright is running multiple pages at the same time:

import asyncio
from playwright.async_api import async_playwright

async def scrape_page(browser, url: str) -> dict:
    """Scrape a single page in its own context."""
    context = await browser.new_context()
    page = await context.new_page()

    try:
        await page.goto(url, wait_until="networkidle", timeout=15000)
        return {
            "url": url,
            "title": await page.title(),
            "heading": await page.locator("h1").text_content()
            if await page.locator("h1").count() > 0 else None,
        }
    except Exception as e:
        return {"url": url, "error": str(e)}
    finally:
        await context.close()

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org",
        "https://jsonplaceholder.typicode.com",
        "https://reqres.in",
        "https://dummyjson.com",
    ]

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        # Scrape all pages concurrently
        tasks = [scrape_page(browser, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            if "error" in result:
                print(f"FAILED: {result['url']} - {result['error']}")
            else:
                print(f"OK: {result['title']} ({result['url']})")

        await browser.close()

asyncio.run(main())

This scrapes all five pages simultaneously rather than sequentially. On a fast connection, this completes in roughly the time of the slowest single page load, not the sum of all five.

Controlling Concurrency with Semaphores

Unlimited concurrency can overwhelm the browser or trigger rate limiting. Use an asyncio.Semaphore to cap parallel sessions:

import asyncio
from playwright.async_api import async_playwright

async def scrape_with_limit(browser, url: str, semaphore: asyncio.Semaphore):
    async with semaphore:
        context = await browser.new_context()
        page = await context.new_page()
        try:
            await page.goto(url, wait_until="networkidle")
            title = await page.title()
            return {"url": url, "title": title}
        except Exception as e:
            return {"url": url, "error": str(e)}
        finally:
            await context.close()

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(20)]

    # Allow at most 5 concurrent browser contexts
    semaphore = asyncio.Semaphore(5)

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        tasks = [scrape_with_limit(browser, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

        success = sum(1 for r in results if "error" not in r)
        print(f"Completed: {success}/{len(urls)} pages")

        await browser.close()

asyncio.run(main())

The semaphore ensures that no more than 5 contexts are active at any time, preventing memory exhaustion while still maintaining significant parallelism.

Async Event Handling

Handle network events and page events asynchronously:

import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()

        api_responses = []

        async def on_response(response):
            if "/api/" in response.url and response.status == 200:
                try:
                    data = await response.json()
                    api_responses.append({
                        "url": response.url,
                        "data": data,
                    })
                except Exception:
                    pass

        page.on("response", on_response)
        await page.goto("https://example.com")
        await page.wait_for_load_state("networkidle")

        print(f"Captured {len(api_responses)} API responses")
        await browser.close()

asyncio.run(main())

Combining Playwright with Other Async Operations

The real power of async comes from combining browser automation with other I/O operations — API calls, database queries, and LLM requests:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import asyncio
from openai import AsyncOpenAI
from playwright.async_api import async_playwright

client = AsyncOpenAI()

async def scrape_and_analyze(browser, url: str) -> dict:
    """Scrape a page and analyze its content with an LLM."""
    context = await browser.new_context()
    page = await context.new_page()

    try:
        await page.goto(url, wait_until="networkidle")
        title = await page.title()
        body_text = await page.locator("body").text_content()

        # Truncate to avoid token limits
        body_text = body_text[:3000] if body_text else ""

        # Analyze with LLM while we have the page data
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": "Summarize the following web page content "
                               "in 2-3 sentences.",
                },
                {"role": "user", "content": f"Title: {title}\n{body_text}"},
            ],
            max_tokens=200,
        )

        summary = response.choices[0].message.content
        return {"url": url, "title": title, "summary": summary}

    except Exception as e:
        return {"url": url, "error": str(e)}
    finally:
        await context.close()

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org",
    ]

    async with async_playwright() as p:
        browser = await p.chromium.launch()
        tasks = [scrape_and_analyze(browser, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for r in results:
            if "summary" in r:
                print(f"\n{r['title']}:")
                print(f"  {r['summary']}")

        await browser.close()

asyncio.run(main())

Async Producer-Consumer Pattern

For high-throughput scraping, use a queue-based producer-consumer pattern:

import asyncio
from playwright.async_api import async_playwright

async def worker(name: str, browser, queue: asyncio.Queue, results: list):
    """Worker that processes URLs from a shared queue."""
    while True:
        url = await queue.get()
        if url is None:
            queue.task_done()
            break

        context = await browser.new_context()
        page = await context.new_page()
        try:
            await page.goto(url, wait_until="networkidle", timeout=10000)
            results.append({
                "url": url,
                "title": await page.title(),
                "worker": name,
            })
            print(f"[{name}] Scraped: {url}")
        except Exception as e:
            print(f"[{name}] Failed: {url} ({e})")
        finally:
            await context.close()
            queue.task_done()

async def main():
    urls = [f"https://example.com/item/{i}" for i in range(15)]
    num_workers = 3

    queue = asyncio.Queue()
    results = []

    for url in urls:
        await queue.put(url)

    # Add poison pills to stop workers
    for _ in range(num_workers):
        await queue.put(None)

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        workers = [
            asyncio.create_task(
                worker(f"W{i}", browser, queue, results)
            )
            for i in range(num_workers)
        ]

        await asyncio.gather(*workers)
        print(f"\nTotal scraped: {len(results)}")

        await browser.close()

asyncio.run(main())

FAQ

When should I use async vs sync Playwright?

Use sync Playwright for simple scripts, debugging, and prototyping — it is easier to read and write. Switch to async when you need concurrent page operations, integration with other async libraries (FastAPI, aiohttp, OpenAI async client), or high-throughput automation with many pages. If your AI agent framework is already async (most modern ones are), use async Playwright to avoid blocking the event loop.

Does asyncio.gather run tasks in separate threads?

No. asyncio.gather runs coroutines concurrently within a single thread using cooperative multitasking. When one coroutine hits an await (waiting for a page to load, for example), the event loop switches to another coroutine that is ready to run. This works well for I/O-bound tasks like browser automation. For CPU-bound work, you would need asyncio.to_thread() or ProcessPoolExecutor.

How many concurrent browser pages can async Playwright handle?

The practical limit depends on RAM and the complexity of the pages being loaded. Each page/context uses roughly 20-50 MB. On a 16 GB machine, you can comfortably run 50-100 concurrent lightweight pages. Use a semaphore to cap concurrency at a level your machine can handle, and monitor memory usage during development to find the right number.


#AsyncPython #Playwright #Asyncio #ConcurrentAutomation #AIAgents #ParallelScraping #EventLoop

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

LangGraph Checkpointers in Production: Durable, Resumable Agents with Eval Replay

Use LangGraph's checkpointer to make agents resumable across crashes and human-in-the-loop pauses, then replay any checkpoint into your eval pipeline.

Agentic AI

Browser Agents with LangGraph + Playwright: Visual Evaluation Pipelines That Don't Lie

Build a browser agent with LangGraph and Playwright that does multi-step web tasks, then ground-truth its work with visual diffs and DOM-based evaluators.

Agentic AI

LangGraph State-Machine Architecture: A Principal-Engineer Deep Dive (2026)

How LangGraph's StateGraph, channels, and reducers actually work — with a working multi-step agent, eval hooks at every node, and the patterns that survive production.

Agentic AI

OpenAI Computer-Use Agents (CUA) in Production: Build + Evaluate a Real Workflow (2026)

Build a working computer-use agent with the OpenAI Computer Use tool — clicks, types, scrolls a real browser — then evaluate task success on a benchmark suite.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Multi-Agent Handoffs with the OpenAI Agents SDK: The Pattern That Actually Scales (2026)

Handoffs done right — when one agent should hand control to another, how to preserve context, and how to evaluate the handoff decision itself.