Implementing Agent Checkpoints: Save and Resume Long-Running Agent Tasks
Learn how to implement checkpointing for AI agents — serializing agent state, persisting progress to disk or database, and resuming interrupted tasks with idempotent operations and recovery strategies.
Why Agents Need Checkpoints
Long-running agents face real-world interruptions: server restarts, network timeouts, API rate limits, or simple crashes. Without checkpoints, a research agent that spent 30 minutes gathering and analyzing data loses all progress and must start from scratch. Checkpointing saves the agent's state at regular intervals so it can resume from the last saved point rather than the beginning.
Checkpointing is especially critical for agents that perform expensive operations — LLM calls, API requests, or database writes — because resuming from a checkpoint avoids repeating those costs.
Designing the Checkpoint System
A checkpoint captures everything needed to resume the agent: the current task state, completed steps, intermediate results, and any accumulated context.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
flowchart TD
CALL(["Inbound Call"])
HEALTH{"Primary<br/>agent healthy?"}
PRIMARY["Primary agent<br/>LLM provider A"]
SECONDARY["Hot standby<br/>LLM provider B"]
QUEUE[("Persisted<br/>call state")]
HUMAN(["Live human<br/>fallback"])
DONE(["Caller served"])
CALL --> HEALTH
HEALTH -->|Yes| PRIMARY
HEALTH -->|Timeout or 5xx| SECONDARY
PRIMARY --> QUEUE
SECONDARY --> QUEUE
PRIMARY --> DONE
SECONDARY --> DONE
SECONDARY -->|Both fail| HUMAN
style HEALTH fill:#f59e0b,stroke:#d97706,color:#1f2937
style PRIMARY fill:#4f46e5,stroke:#4338ca,color:#fff
style SECONDARY fill:#0ea5e9,stroke:#0369a1,color:#fff
style HUMAN fill:#dc2626,stroke:#b91c1c,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
import json
import hashlib
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class Checkpoint:
task_id: str
step_index: int
state: Dict[str, Any]
completed_steps: List[str]
intermediate_results: Dict[str, Any]
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
checksum: str = ""
def compute_checksum(self) -> str:
"""Create a checksum to detect corruption."""
data = json.dumps(
{k: v for k, v in asdict(self).items() if k != "checksum"},
sort_keys=True,
)
return hashlib.sha256(data.encode()).hexdigest()[:16]
class CheckpointStore:
def __init__(self, storage_dir: str = "./checkpoints"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
def save(self, checkpoint: Checkpoint):
checkpoint.checksum = checkpoint.compute_checksum()
path = self.storage_dir / f"{checkpoint.task_id}.json"
# Write to temp file first, then rename for atomicity
tmp_path = path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(asdict(checkpoint), indent=2))
tmp_path.rename(path)
def load(self, task_id: str) -> Optional[Checkpoint]:
path = self.storage_dir / f"{task_id}.json"
if not path.exists():
return None
data = json.loads(path.read_text())
checkpoint = Checkpoint(**data)
# Verify integrity
expected = checkpoint.compute_checksum()
if checkpoint.checksum != expected:
raise ValueError(
f"Checkpoint corrupted: expected {expected}, got {checkpoint.checksum}"
)
return checkpoint
def delete(self, task_id: str):
path = self.storage_dir / f"{task_id}.json"
if path.exists():
path.unlink()
The atomic write pattern (write to a temp file, then rename) prevents data corruption if the process is killed mid-write.
Building a Checkpointed Agent Runner
The runner wraps your agent logic, automatically saving checkpoints after each step and resuming from the last checkpoint on restart.
from typing import Callable, Awaitable
@dataclass
class AgentStep:
name: str
execute: Callable[..., Awaitable[Any]]
idempotent: bool = True # safe to re-execute without side effects?
class CheckpointedRunner:
def __init__(self, task_id: str, steps: List[AgentStep], store: CheckpointStore = None):
self.task_id = task_id
self.steps = steps
self.store = store or CheckpointStore()
self.results: Dict[str, Any] = {}
self.state: Dict[str, Any] = {}
async def run(self) -> Dict[str, Any]:
# Try to resume from checkpoint
checkpoint = self.store.load(self.task_id)
start_index = 0
if checkpoint:
start_index = checkpoint.step_index
self.results = checkpoint.intermediate_results
self.state = checkpoint.state
print(
f"Resuming task {self.task_id} from step {start_index} "
f"({checkpoint.completed_steps[-1] if checkpoint.completed_steps else 'start'})"
)
completed = list(self.results.keys())
for i in range(start_index, len(self.steps)):
step = self.steps[i]
print(f"Executing step {i}: {step.name}")
try:
result = await step.execute(self.state, self.results)
self.results[step.name] = result
completed.append(step.name)
# Save checkpoint after each successful step
self.store.save(Checkpoint(
task_id=self.task_id,
step_index=i + 1,
state=self.state,
completed_steps=completed,
intermediate_results=self.results,
))
except Exception as e:
# Save checkpoint at the failed step so we can retry it
self.store.save(Checkpoint(
task_id=self.task_id,
step_index=i, # retry this step on resume
state=self.state,
completed_steps=completed,
intermediate_results=self.results,
))
raise RuntimeError(
f"Step '{step.name}' failed: {e}. "
f"Checkpoint saved at step {i}."
) from e
# Clean up checkpoint on success
self.store.delete(self.task_id)
return self.results
Idempotency Considerations
When an agent resumes from a checkpoint, it may re-execute the step that failed. If that step sends an email or charges a credit card, you get duplicate side effects. Idempotent operations produce the same result regardless of how many times they run.
class IdempotencyGuard:
"""Track completed operations to prevent duplicate execution."""
def __init__(self, store_path: str = "./idempotency_keys.json"):
self.path = Path(store_path)
self.keys: Dict[str, Any] = {}
if self.path.exists():
self.keys = json.loads(self.path.read_text())
def execute_once(self, key: str, func: Callable, *args, **kwargs) -> Any:
"""Execute func only if this key has not been executed before."""
if key in self.keys:
print(f"Skipping already-executed operation: {key}")
return self.keys[key]
result = func(*args, **kwargs)
self.keys[key] = result
self.path.write_text(json.dumps(self.keys, indent=2, default=str))
return result
# Usage in an agent step
guard = IdempotencyGuard()
async def send_notification(state, results):
user_email = results["collect_info"]["email"]
guard.execute_once(
f"notify_{user_email}_{state.get('task_id')}",
lambda: email_service.send(user_email, "Your report is ready"),
)
Practical Example: Research Agent with Checkpoints
import asyncio
async def gather_sources(state, results):
# Simulate expensive web research
state["query"] = "AI agent memory architectures"
return {"sources": ["paper_A", "paper_B", "paper_C"]}
async def analyze_sources(state, results):
sources = results["gather_sources"]["sources"]
return {"analysis": f"Analyzed {len(sources)} sources on {state['query']}"}
async def generate_report(state, results):
analysis = results["analyze_sources"]["analysis"]
return {"report": f"Final report based on: {analysis}"}
# Define the pipeline
steps = [
AgentStep(name="gather_sources", execute=gather_sources),
AgentStep(name="analyze_sources", execute=analyze_sources),
AgentStep(name="generate_report", execute=generate_report),
]
runner = CheckpointedRunner(task_id="research_001", steps=steps)
result = asyncio.run(runner.run())
If the process crashes during analyze_sources, restarting runner.run() picks up from the checkpoint: it skips gather_sources (already completed) and retries analyze_sources.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
FAQ
How often should I save checkpoints?
After every significant step or expensive operation. The goal is to minimize rework on failure. If a step takes 10 seconds, checkpointing after it saves 10 seconds on restart. If checkpointing itself is expensive (e.g., writing large embeddings), batch multiple lightweight steps between checkpoints.
Should I store checkpoints in files or a database?
Files work well for single-machine agents. Use a database (PostgreSQL, Redis) for distributed agents or when multiple processes need to read checkpoint state. Database-backed checkpoints also make it easier to monitor and manage long-running tasks via a dashboard.
How do I handle non-serializable state like database connections?
Separate your state into serializable data (stored in the checkpoint) and runtime resources (recreated on resume). The checkpoint should contain only the data needed to reconstruct the agent's position — IDs, results, counters — not live connections or file handles.
#Checkpointing #AgentPersistence #FaultTolerance #Idempotency #AgenticAI #LearnAI #AIEngineering
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.