Claude API Batching: Processing Thousands of Requests Cost-Effectively
Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads.
What Is the Message Batches API?
The Claude Message Batches API allows you to submit up to 10,000 requests in a single batch and receive results asynchronously. Each request in the batch gets a 50% discount on both input and output tokens compared to the standard Messages API.
The tradeoff: batches can take up to 24 hours to complete (though most finish within 1-2 hours). This makes the Batch API ideal for workloads that do not require real-time responses.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
Ideal Use Cases
- Document classification across thousands of files
- Bulk content moderation
- Dataset annotation and labeling
- Nightly report generation
- Mass email personalization
- Code analysis across a large codebase
- Evaluation and testing of prompts at scale
Submitting a Batch
from anthropic import Anthropic
client = Anthropic()
# Each request in the batch follows the standard Messages API format
requests = []
for i, document in enumerate(documents):
requests.append({
"custom_id": f"doc-{i}", # Your identifier for tracking
"params": {
"model": "claude-sonnet-4-5-20250514",
"max_tokens": 1024,
"messages": [{
"role": "user",
"content": f"Classify this document into one of: [legal, financial, technical, marketing].\n\nDocument:\n{document}"
}]
}
})
# Submit the batch
batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.total}")
Polling for Results
import time
def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
"""Poll until batch completes."""
while True:
batch = client.messages.batches.retrieve(batch_id)
print(f"Status: {batch.processing_status}")
print(f" Succeeded: {batch.request_counts.succeeded}")
print(f" Errored: {batch.request_counts.errored}")
print(f" Processing: {batch.request_counts.processing}")
if batch.processing_status == "ended":
return batch
time.sleep(poll_interval)
batch_result = wait_for_batch(batch.id)
Retrieving Results
def get_batch_results(batch_id: str) -> dict[str, str]:
"""Retrieve all results from a completed batch."""
results = {}
for result in client.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
text = message.content[0].text
results[custom_id] = {
"status": "success",
"text": text,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
}
elif result.result.type == "errored":
results[custom_id] = {
"status": "error",
"error": str(result.result.error),
}
elif result.result.type == "expired":
results[custom_id] = {
"status": "expired",
}
return results
results = get_batch_results(batch.id)
for custom_id, result in results.items():
if result["status"] == "success":
print(f"{custom_id}: {result['text'][:100]}...")
Production Batch Pipeline
Here is a complete pipeline for batch-processing a dataset:
flowchart LR
USER(["User message"])
LOOP{"messages.create<br/>agent loop"}
THINK["Extended thinking<br/>optional"]
TOOL{"stop_reason<br/>tool_use?"}
EXEC["Execute tool<br/>append tool_result"]
DONE(["stop_reason<br/>end_turn"])
USER --> LOOP --> THINK --> TOOL
TOOL -->|Yes| EXEC --> LOOP
TOOL -->|No| DONE
style LOOP fill:#4f46e5,stroke:#4338ca,color:#fff
style THINK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style DONE fill:#059669,stroke:#047857,color:#fff
import json
import asyncio
from pathlib import Path
from datetime import datetime
class BatchPipeline:
def __init__(self, client: Anthropic, output_dir: str = "./batch_results"):
self.client = client
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def prepare_requests(
self,
items: list[dict],
system_prompt: str,
user_template: str,
model: str = "claude-sonnet-4-5-20250514",
max_tokens: int = 1024,
) -> list[dict]:
"""Convert items into batch request format."""
requests = []
for item in items:
user_content = user_template.format(**item)
requests.append({
"custom_id": str(item.get("id", len(requests))),
"params": {
"model": model,
"max_tokens": max_tokens,
"system": system_prompt,
"messages": [{"role": "user", "content": user_content}],
}
})
return requests
def submit(self, requests: list[dict]) -> str:
"""Submit batch and return batch ID."""
# Batch API supports up to 10,000 requests
if len(requests) > 10_000:
raise ValueError(f"Too many requests: {len(requests)} (max 10,000)")
batch = self.client.messages.batches.create(requests=requests)
# Save metadata
metadata = {
"batch_id": batch.id,
"submitted_at": datetime.utcnow().isoformat(),
"total_requests": len(requests),
}
with open(self.output_dir / f"{batch.id}_metadata.json", "w") as f:
json.dump(metadata, f)
return batch.id
def collect_results(self, batch_id: str) -> list[dict]:
"""Wait for completion and collect all results."""
batch = self._wait(batch_id)
results = []
for result in self.client.messages.batches.results(batch_id):
entry = {"custom_id": result.custom_id}
if result.result.type == "succeeded":
msg = result.result.message
entry["output"] = msg.content[0].text
entry["usage"] = {
"input": msg.usage.input_tokens,
"output": msg.usage.output_tokens,
}
else:
entry["error"] = result.result.type
results.append(entry)
# Save results
with open(self.output_dir / f"{batch_id}_results.json", "w") as f:
json.dump(results, f, indent=2)
return results
def _wait(self, batch_id: str):
while True:
batch = self.client.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
return batch
time.sleep(30)
Usage Example
pipeline = BatchPipeline(client)
# Prepare 5,000 classification requests
items = [{"id": f"doc-{i}", "text": doc} for i, doc in enumerate(documents)]
requests = pipeline.prepare_requests(
items=items,
system_prompt="Classify documents into categories. Return JSON with 'category' and 'confidence'.",
user_template="Classify this document:\n\n{text}",
model="claude-haiku-4-5-20250514", # Use Haiku for simple classification
max_tokens=256,
)
batch_id = pipeline.submit(requests)
results = pipeline.collect_results(batch_id)
# Analyze results
succeeded = [r for r in results if "output" in r]
failed = [r for r in results if "error" in r]
print(f"Success: {len(succeeded)}, Failed: {len(failed)}")
Cost Comparison
Processing 10,000 documents with an average of 500 input tokens and 100 output tokens each:
| Method | Input Cost | Output Cost | Total | Time |
|---|---|---|---|---|
| Standard API (Sonnet) | $15.00 | $15.00 | $30.00 | ~2 hours (rate limited) |
| Batch API (Sonnet) | $7.50 | $7.50 | $15.00 | 1-2 hours |
| Standard API (Haiku) | $5.00 | $5.00 | $10.00 | ~1 hour |
| Batch API (Haiku) | $2.50 | $2.50 | $5.00 | 1-2 hours |
The Batch API saves 50% on cost with comparable or better throughput for large workloads.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Error Handling and Retries
Batches can have partial failures. Always handle errors per-request:
def handle_batch_errors(batch_id: str) -> list[dict]:
"""Collect failed requests for retry."""
failed = []
for result in client.messages.batches.results(batch_id):
if result.result.type == "errored":
failed.append({
"custom_id": result.custom_id,
"error": str(result.result.error),
})
elif result.result.type == "expired":
failed.append({
"custom_id": result.custom_id,
"error": "expired",
})
return failed
# Retry failed requests in a new batch
failed = handle_batch_errors(batch_id)
if failed:
retry_requests = [
original_requests[r["custom_id"]]
for r in failed
if r["custom_id"] in original_requests
]
if retry_requests:
retry_batch = client.messages.batches.create(requests=retry_requests)
Canceling a Batch
If you need to stop a batch that is in progress:
# Cancel a running batch
client.messages.batches.cancel(batch_id)
# Results for already-completed requests are still available
# Only pending requests are canceled
Best Practices
- Use meaningful custom_ids that map back to your data source for easy result matching
- Save batch IDs immediately after submission -- you need them to retrieve results
- Monitor batch progress with periodic polling, especially for time-sensitive workflows
- Implement idempotency -- design your pipeline so resubmitting the same batch is safe
- Chunk large datasets into multiple batches of 10,000 if needed
- Use the cheapest model that meets your quality requirements -- Haiku with Batch API is extremely cost-effective for classification and extraction tasks
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.