Skip to content
Learn Agentic AI
Learn Agentic AI12 min read17 views

The Saga Pattern: Managing Long-Running Multi-Step Agent Transactions

Implement the Saga pattern for AI agent systems to manage multi-step transactions with compensating actions, rollback support, and saga orchestration for reliable distributed workflows.

The Transaction Problem in Multi-Agent Systems

When an AI agent workflow spans multiple steps — booking a flight, reserving a hotel, and renting a car — each step may call a different external service. If the car rental fails after the flight and hotel are already booked, you need to cancel the hotel reservation and the flight booking. Traditional database transactions cannot span these external services. The Saga pattern solves this by defining a compensating action for each step that undoes its effect if a later step fails.

A saga is a sequence of steps where each step has both an action (the forward operation) and a compensation (the rollback operation). If any step fails, the saga executes compensations for all previously completed steps, in reverse order.

Core Saga Framework

from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Any
from datetime import datetime

class StepStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATED = "compensated"

@dataclass
class SagaStep:
    name: str
    action: Callable[[dict], Any]
    compensation: Callable[[dict, Any], None]
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str | None = None

class SagaStatus(Enum):
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    ROLLED_BACK = "rolled_back"
    FAILED = "failed"

@dataclass
class SagaLog:
    saga_id: str
    status: SagaStatus
    steps: list[dict]
    started_at: datetime
    completed_at: datetime | None = None

class SagaOrchestrator:
    def __init__(self, saga_id: str):
        self.saga_id = saga_id
        self.steps: list[SagaStep] = []
        self.context: dict = {}
        self.status = SagaStatus.RUNNING

    def add_step(
        self,
        name: str,
        action: Callable[[dict], Any],
        compensation: Callable[[dict, Any], None],
    ) -> "SagaOrchestrator":
        self.steps.append(SagaStep(
            name=name, action=action, compensation=compensation
        ))
        return self

    def execute(self, initial_context: dict | None = None) -> SagaLog:
        if initial_context:
            self.context.update(initial_context)

        started = datetime.now()
        completed_steps: list[SagaStep] = []

        for step in self.steps:
            print(f"[Saga {self.saga_id}] Executing: {step.name}")
            try:
                result = step.action(self.context)
                step.result = result
                step.status = StepStatus.COMPLETED
                completed_steps.append(step)

                # Store result in context for subsequent steps
                self.context[f"{step.name}_result"] = result
                print(f"[Saga {self.saga_id}] "
                      f"Completed: {step.name}")

            except Exception as e:
                step.status = StepStatus.FAILED
                step.error = str(e)
                print(f"[Saga {self.saga_id}] "
                      f"Failed at {step.name}: {e}")

                # Compensate in reverse order
                self._compensate(completed_steps)
                return self._build_log(started)

        self.status = SagaStatus.COMPLETED
        return self._build_log(started)

    def _compensate(self, completed_steps: list[SagaStep]):
        self.status = SagaStatus.COMPENSATING
        print(f"[Saga {self.saga_id}] Starting compensation "
              f"for {len(completed_steps)} steps")

        for step in reversed(completed_steps):
            try:
                print(f"[Saga {self.saga_id}] "
                      f"Compensating: {step.name}")
                step.compensation(self.context, step.result)
                step.status = StepStatus.COMPENSATED
            except Exception as e:
                print(f"[Saga {self.saga_id}] Compensation "
                      f"FAILED for {step.name}: {e}")
                self.status = SagaStatus.FAILED
                return

        self.status = SagaStatus.ROLLED_BACK

    def _build_log(self, started: datetime) -> SagaLog:
        return SagaLog(
            saga_id=self.saga_id,
            status=self.status,
            steps=[
                {
                    "name": s.name,
                    "status": s.status.value,
                    "error": s.error,
                }
                for s in self.steps
            ],
            started_at=started,
            completed_at=datetime.now(),
        )

Applying the Saga to a Travel Booking

import uuid

# Simulated external service calls
def book_flight(ctx: dict) -> dict:
    print(f"  Booking flight to {ctx['destination']}")
    booking_id = str(uuid.uuid4())[:8]
    # Simulate API call to airline
    return {"booking_id": booking_id, "airline": "SkyAir",
            "price": 450.00}

def cancel_flight(ctx: dict, result: dict) -> None:
    print(f"  Cancelling flight {result['booking_id']}")
    # Simulate cancellation API call

def reserve_hotel(ctx: dict) -> dict:
    print(f"  Reserving hotel in {ctx['destination']}")
    reservation_id = str(uuid.uuid4())[:8]
    return {"reservation_id": reservation_id,
            "hotel": "Grand Plaza", "price": 200.00}

def cancel_hotel(ctx: dict, result: dict) -> None:
    print(f"  Cancelling hotel {result['reservation_id']}")

def rent_car(ctx: dict) -> dict:
    print(f"  Renting car in {ctx['destination']}")
    # Simulate a failure
    if ctx.get("simulate_failure"):
        raise Exception("No cars available at destination")
    rental_id = str(uuid.uuid4())[:8]
    return {"rental_id": rental_id, "price": 75.00}

def cancel_car(ctx: dict, result: dict) -> None:
    print(f"  Cancelling car rental {result['rental_id']}")

# Build and execute the saga
saga = (
    SagaOrchestrator("travel-001")
    .add_step("book_flight", book_flight, cancel_flight)
    .add_step("reserve_hotel", reserve_hotel, cancel_hotel)
    .add_step("rent_car", rent_car, cancel_car)
)

# This will fail at rent_car and roll back hotel + flight
log = saga.execute({
    "destination": "Tokyo",
    "simulate_failure": True,
})

print(f"\nSaga status: {log.status.value}")
for step in log.steps:
    print(f"  {step['name']}: {step['status']}")

Running this produces:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
[Saga travel-001] Executing: book_flight
  Booking flight to Tokyo
[Saga travel-001] Completed: book_flight
[Saga travel-001] Executing: reserve_hotel
  Reserving hotel in Tokyo
[Saga travel-001] Completed: reserve_hotel
[Saga travel-001] Executing: rent_car
  Renting car in Tokyo
[Saga travel-001] Failed at rent_car: No cars available
[Saga travel-001] Starting compensation for 2 steps
[Saga travel-001] Compensating: reserve_hotel
  Cancelling hotel abc123
[Saga travel-001] Compensating: book_flight
  Cancelling flight def456

Saga status: rolled_back
  book_flight: compensated
  reserve_hotel: compensated
  rent_car: failed

Handling Compensation Failures

The hardest part of the Saga pattern is when a compensation itself fails. If you cannot cancel the flight, the system is in an inconsistent state. Common strategies include: retrying the compensation with exponential backoff, logging the failure for manual intervention, or using an idempotent compensation design so retries are safe.

FAQ

What is the difference between the Saga pattern and the Pipeline pattern?

The Pipeline pattern focuses on data transformation through sequential stages — if a stage fails, you stop or retry that stage. The Saga pattern focuses on distributed transactions — if a step fails, you must undo the side effects of all previous steps. Use Pipeline for data processing and Saga for operations that create external state that needs cleanup on failure.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

How do I make compensations idempotent?

Store the result of each step (booking IDs, reservation IDs) and check whether the resource has already been cancelled before attempting cancellation. If the resource no longer exists, the compensation is a no-op rather than an error. This makes it safe to retry compensations multiple times.

Can I run saga steps in parallel instead of sequentially?

Yes, but parallel sagas are significantly more complex. You need to track which parallel branches completed, compensate only the completed branches on failure, and handle the case where a compensation races with a still-running step. Start with sequential sagas and only introduce parallelism when the performance gain justifies the added complexity.


#AgentDesignPatterns #SagaPattern #Python #DistributedSystems #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.