Skip to content
Agentic AI
Agentic AI5 min read17 views

Claude API Batching: Processing Thousands of Requests Cost-Effectively

Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads.

What Is the Message Batches API?

The Claude Message Batches API allows you to submit up to 10,000 requests in a single batch and receive results asynchronously. Each request in the batch gets a 50% discount on both input and output tokens compared to the standard Messages API.

The tradeoff: batches can take up to 24 hours to complete (though most finish within 1-2 hours). This makes the Batch API ideal for workloads that do not require real-time responses.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Ideal Use Cases

  • Document classification across thousands of files
  • Bulk content moderation
  • Dataset annotation and labeling
  • Nightly report generation
  • Mass email personalization
  • Code analysis across a large codebase
  • Evaluation and testing of prompts at scale

Submitting a Batch

from anthropic import Anthropic

client = Anthropic()

# Each request in the batch follows the standard Messages API format
requests = []
for i, document in enumerate(documents):
    requests.append({
        "custom_id": f"doc-{i}",  # Your identifier for tracking
        "params": {
            "model": "claude-sonnet-4-5-20250514",
            "max_tokens": 1024,
            "messages": [{
                "role": "user",
                "content": f"Classify this document into one of: [legal, financial, technical, marketing].\n\nDocument:\n{document}"
            }]
        }
    })

# Submit the batch
batch = client.messages.batches.create(requests=requests)

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.total}")

Polling for Results

import time

def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
    """Poll until batch completes."""
    while True:
        batch = client.messages.batches.retrieve(batch_id)

        print(f"Status: {batch.processing_status}")
        print(f"  Succeeded: {batch.request_counts.succeeded}")
        print(f"  Errored: {batch.request_counts.errored}")
        print(f"  Processing: {batch.request_counts.processing}")

        if batch.processing_status == "ended":
            return batch

        time.sleep(poll_interval)

batch_result = wait_for_batch(batch.id)

Retrieving Results

def get_batch_results(batch_id: str) -> dict[str, str]:
    """Retrieve all results from a completed batch."""
    results = {}

    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id

        if result.result.type == "succeeded":
            message = result.result.message
            text = message.content[0].text
            results[custom_id] = {
                "status": "success",
                "text": text,
                "input_tokens": message.usage.input_tokens,
                "output_tokens": message.usage.output_tokens,
            }
        elif result.result.type == "errored":
            results[custom_id] = {
                "status": "error",
                "error": str(result.result.error),
            }
        elif result.result.type == "expired":
            results[custom_id] = {
                "status": "expired",
            }

    return results

results = get_batch_results(batch.id)
for custom_id, result in results.items():
    if result["status"] == "success":
        print(f"{custom_id}: {result['text'][:100]}...")

Production Batch Pipeline

Here is a complete pipeline for batch-processing a dataset:

flowchart LR
    USER(["User message"])
    LOOP{"messages.create<br/>agent loop"}
    THINK["Extended thinking<br/>optional"]
    TOOL{"stop_reason<br/>tool_use?"}
    EXEC["Execute tool<br/>append tool_result"]
    DONE(["stop_reason<br/>end_turn"])
    USER --> LOOP --> THINK --> TOOL
    TOOL -->|Yes| EXEC --> LOOP
    TOOL -->|No| DONE
    style LOOP fill:#4f46e5,stroke:#4338ca,color:#fff
    style THINK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style DONE fill:#059669,stroke:#047857,color:#fff
import json
import asyncio
from pathlib import Path
from datetime import datetime

class BatchPipeline:
    def __init__(self, client: Anthropic, output_dir: str = "./batch_results"):
        self.client = client
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def prepare_requests(
        self,
        items: list[dict],
        system_prompt: str,
        user_template: str,
        model: str = "claude-sonnet-4-5-20250514",
        max_tokens: int = 1024,
    ) -> list[dict]:
        """Convert items into batch request format."""
        requests = []
        for item in items:
            user_content = user_template.format(**item)
            requests.append({
                "custom_id": str(item.get("id", len(requests))),
                "params": {
                    "model": model,
                    "max_tokens": max_tokens,
                    "system": system_prompt,
                    "messages": [{"role": "user", "content": user_content}],
                }
            })
        return requests

    def submit(self, requests: list[dict]) -> str:
        """Submit batch and return batch ID."""
        # Batch API supports up to 10,000 requests
        if len(requests) > 10_000:
            raise ValueError(f"Too many requests: {len(requests)} (max 10,000)")

        batch = self.client.messages.batches.create(requests=requests)

        # Save metadata
        metadata = {
            "batch_id": batch.id,
            "submitted_at": datetime.utcnow().isoformat(),
            "total_requests": len(requests),
        }
        with open(self.output_dir / f"{batch.id}_metadata.json", "w") as f:
            json.dump(metadata, f)

        return batch.id

    def collect_results(self, batch_id: str) -> list[dict]:
        """Wait for completion and collect all results."""
        batch = self._wait(batch_id)
        results = []

        for result in self.client.messages.batches.results(batch_id):
            entry = {"custom_id": result.custom_id}
            if result.result.type == "succeeded":
                msg = result.result.message
                entry["output"] = msg.content[0].text
                entry["usage"] = {
                    "input": msg.usage.input_tokens,
                    "output": msg.usage.output_tokens,
                }
            else:
                entry["error"] = result.result.type

            results.append(entry)

        # Save results
        with open(self.output_dir / f"{batch_id}_results.json", "w") as f:
            json.dump(results, f, indent=2)

        return results

    def _wait(self, batch_id: str):
        while True:
            batch = self.client.messages.batches.retrieve(batch_id)
            if batch.processing_status == "ended":
                return batch
            time.sleep(30)

Usage Example

pipeline = BatchPipeline(client)

# Prepare 5,000 classification requests
items = [{"id": f"doc-{i}", "text": doc} for i, doc in enumerate(documents)]

requests = pipeline.prepare_requests(
    items=items,
    system_prompt="Classify documents into categories. Return JSON with 'category' and 'confidence'.",
    user_template="Classify this document:\n\n{text}",
    model="claude-haiku-4-5-20250514",  # Use Haiku for simple classification
    max_tokens=256,
)

batch_id = pipeline.submit(requests)
results = pipeline.collect_results(batch_id)

# Analyze results
succeeded = [r for r in results if "output" in r]
failed = [r for r in results if "error" in r]
print(f"Success: {len(succeeded)}, Failed: {len(failed)}")

Cost Comparison

Processing 10,000 documents with an average of 500 input tokens and 100 output tokens each:

Method Input Cost Output Cost Total Time
Standard API (Sonnet) $15.00 $15.00 $30.00 ~2 hours (rate limited)
Batch API (Sonnet) $7.50 $7.50 $15.00 1-2 hours
Standard API (Haiku) $5.00 $5.00 $10.00 ~1 hour
Batch API (Haiku) $2.50 $2.50 $5.00 1-2 hours

The Batch API saves 50% on cost with comparable or better throughput for large workloads.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Error Handling and Retries

Batches can have partial failures. Always handle errors per-request:

def handle_batch_errors(batch_id: str) -> list[dict]:
    """Collect failed requests for retry."""
    failed = []
    for result in client.messages.batches.results(batch_id):
        if result.result.type == "errored":
            failed.append({
                "custom_id": result.custom_id,
                "error": str(result.result.error),
            })
        elif result.result.type == "expired":
            failed.append({
                "custom_id": result.custom_id,
                "error": "expired",
            })
    return failed

# Retry failed requests in a new batch
failed = handle_batch_errors(batch_id)
if failed:
    retry_requests = [
        original_requests[r["custom_id"]]
        for r in failed
        if r["custom_id"] in original_requests
    ]
    if retry_requests:
        retry_batch = client.messages.batches.create(requests=retry_requests)

Canceling a Batch

If you need to stop a batch that is in progress:

# Cancel a running batch
client.messages.batches.cancel(batch_id)

# Results for already-completed requests are still available
# Only pending requests are canceled

Best Practices

  1. Use meaningful custom_ids that map back to your data source for easy result matching
  2. Save batch IDs immediately after submission -- you need them to retrieve results
  3. Monitor batch progress with periodic polling, especially for time-sensitive workflows
  4. Implement idempotency -- design your pipeline so resubmitting the same batch is safe
  5. Chunk large datasets into multiple batches of 10,000 if needed
  6. Use the cheapest model that meets your quality requirements -- Haiku with Batch API is extremely cost-effective for classification and extraction tasks
Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

LLM Comparisons

Reasoning models (Claude Mythos, o3, Opus 4.7, DeepSeek V4-Pro): Which Wins for Browser-side LLMs (WebGPU) in 2026?

Reasoning models (Claude Mythos, o3, Opus 4.7, DeepSeek V4-Pro) for browser-side llms (webgpu) — a May 2026 comparison grounded in current model prices, benchmark...

LLM Comparisons

Self-hosted on-prem stack for Browser-side LLMs (WebGPU): A May 2026 Comparison

Self-hosted on-prem stack for browser-side llms (webgpu) — a May 2026 comparison grounded in current model prices, benchmarks, and production patterns.

LLM Comparisons

Reasoning models (Claude Mythos, o3, Opus 4.7, DeepSeek V4-Pro): Which Wins for Edge / on-device LLM inference in 2026?

Reasoning models (Claude Mythos, o3, Opus 4.7, DeepSeek V4-Pro) for edge / on-device llm inference — a May 2026 comparison grounded in current model prices, bench...

LLM Comparisons

Self-hosted on-prem stack for Edge / on-device LLM inference: A May 2026 Comparison

Self-hosted on-prem stack for edge / on-device llm inference — a May 2026 comparison grounded in current model prices, benchmarks, and production patterns.

LLM Comparisons

Edge / on-device LLM inference in 2026: Open-source frontier matchup (DeepSeek V4 vs Llama 4 vs Qwen 3.5 vs Mistral Large 3)

DeepSeek V4 vs Llama 4 vs Qwen 3.5 vs Mistral Large 3 for edge / on-device llm inference — a May 2026 comparison grounded in current model prices, benchmarks, and...

LLM Comparisons

Reasoning models (Claude Mythos, o3, Opus 4.7, DeepSeek V4-Pro): Which Wins for Multilingual customer support in 2026?

Reasoning models (Claude Mythos, o3, Opus 4.7, DeepSeek V4-Pro) for multilingual customer support — a May 2026 comparison grounded in current model prices, benchm...