Skip to content
Learn Agentic AI
Learn Agentic AI15 min read7 views

Building a Custom Agent Orchestrator: When Off-the-Shelf Tools Are Not Enough

Learn when and how to build a custom agent orchestrator. Covers state machine design, queue integration, monitoring hooks, and the architectural patterns that make custom orchestrators maintainable.

When to Build Custom

Off-the-shelf orchestration platforms like Temporal, Prefect, and Airflow solve 80% of workflow needs. But AI agent systems sometimes hit the other 20%:

  • Sub-second latency requirements that batch-oriented schedulers cannot meet
  • Custom LLM routing logic that needs to inspect token counts, model availability, and cost in real time
  • Tight integration with existing infrastructure that would require fighting an orchestration framework's opinions
  • Specialized retry semantics — for example, retrying with a different model when one returns low-confidence results
  • Multi-tenant isolation requirements that off-the-shelf tools do not support natively

If your agent system has any of these constraints, a custom orchestrator may be the right choice. The key is to build it with clear boundaries so it remains maintainable.

Core Architecture

A custom orchestrator has four components: a state machine that tracks workflow progress, a task queue that distributes work, a worker pool that executes tasks, and a persistence layer that stores state.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart TD
    INPUT(["Task input"])
    SUPER["Supervisor agent<br/>plans plus monitors"]
    W1["Worker 1<br/>research"]
    W2["Worker 2<br/>code"]
    W3["Worker 3<br/>writing"]
    CRITIC{"Output meets<br/>rubric?"}
    REWORK["Rework or<br/>retry path"]
    SHARED[("Shared scratchpad<br/>and memory")]
    OUT(["Final result"])
    INPUT --> SUPER
    SUPER --> W1 --> CRITIC
    SUPER --> W2 --> CRITIC
    SUPER --> W3 --> CRITIC
    W1 --> SHARED
    W2 --> SHARED
    W3 --> SHARED
    SHARED --> SUPER
    CRITIC -->|Pass| OUT
    CRITIC -->|Fail| REWORK --> SUPER
    style SUPER fill:#4f46e5,stroke:#4338ca,color:#fff
    style CRITIC fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OUT fill:#059669,stroke:#047857,color:#fff
    style SHARED fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class WorkflowStatus(Enum):
    CREATED = "created"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

@dataclass
class Step:
    name: str
    handler: str  # Dotted path to the handler function
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str | None = None
    attempts: int = 0
    max_retries: int = 3
    started_at: datetime | None = None
    completed_at: datetime | None = None

@dataclass
class Workflow:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    status: WorkflowStatus = WorkflowStatus.CREATED
    steps: list[Step] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

The State Machine

The state machine enforces valid transitions and prevents workflows from entering inconsistent states.

class WorkflowStateMachine:
    VALID_TRANSITIONS = {
        WorkflowStatus.CREATED: {WorkflowStatus.RUNNING},
        WorkflowStatus.RUNNING: {
            WorkflowStatus.COMPLETED,
            WorkflowStatus.FAILED,
            WorkflowStatus.PAUSED,
        },
        WorkflowStatus.PAUSED: {WorkflowStatus.RUNNING, WorkflowStatus.FAILED},
        WorkflowStatus.FAILED: {WorkflowStatus.RUNNING},  # Allow restart
    }

    def transition(self, workflow: Workflow, new_status: WorkflowStatus):
        allowed = self.VALID_TRANSITIONS.get(workflow.status, set())
        if new_status not in allowed:
            raise ValueError(
                f"Cannot transition from {workflow.status} to {new_status}"
            )
        workflow.status = new_status
        workflow.updated_at = datetime.utcnow()

Queue Integration

Use Redis Streams or a similar lightweight queue to distribute work to workers.

import redis.asyncio as redis
import json

class TaskQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.stream = "agent:tasks"
        self.group = "agent-workers"

    async def initialize(self):
        try:
            await self.redis.xgroup_create(
                self.stream, self.group, mkstream=True
            )
        except redis.ResponseError:
            pass  # Group already exists

    async def enqueue(self, workflow_id: str, step_name: str, payload: dict):
        await self.redis.xadd(
            self.stream,
            {
                "workflow_id": workflow_id,
                "step_name": step_name,
                "payload": json.dumps(payload),
            },
        )

    async def dequeue(self, consumer: str, count: int = 1, block_ms: int = 5000):
        messages = await self.redis.xreadgroup(
            self.group,
            consumer,
            {self.stream: ">"},
            count=count,
            block=block_ms,
        )
        results = []
        for stream_name, entries in messages:
            for msg_id, fields in entries:
                results.append({
                    "id": msg_id,
                    "workflow_id": fields[b"workflow_id"].decode(),
                    "step_name": fields[b"step_name"].decode(),
                    "payload": json.loads(fields[b"payload"]),
                })
        return results

    async def acknowledge(self, message_id: str):
        await self.redis.xack(self.stream, self.group, message_id)

The Orchestrator Engine

import importlib

class Orchestrator:
    def __init__(self, queue: TaskQueue, store: WorkflowStore):
        self.queue = queue
        self.store = store
        self.state_machine = WorkflowStateMachine()
        self.handlers: dict[str, callable] = {}

    def register_handler(self, name: str, handler: callable):
        self.handlers[name] = handler

    async def start_workflow(self, workflow: Workflow) -> str:
        self.state_machine.transition(workflow, WorkflowStatus.RUNNING)
        await self.store.save(workflow)

        # Enqueue the first pending step
        for step in workflow.steps:
            if step.status == StepStatus.PENDING:
                await self.queue.enqueue(
                    workflow.id, step.name, workflow.context
                )
                break
        return workflow.id

    async def process_step_result(
        self, workflow_id: str, step_name: str, result: Any
    ):
        workflow = await self.store.load(workflow_id)
        current_step = next(
            s for s in workflow.steps if s.name == step_name
        )
        current_step.status = StepStatus.COMPLETED
        current_step.result = result
        current_step.completed_at = datetime.utcnow()

        # Add result to context for downstream steps
        workflow.context[f"{step_name}_result"] = result

        # Find and enqueue next pending step
        next_step = next(
            (s for s in workflow.steps if s.status == StepStatus.PENDING),
            None,
        )
        if next_step:
            await self.queue.enqueue(
                workflow.id, next_step.name, workflow.context
            )
        else:
            self.state_machine.transition(
                workflow, WorkflowStatus.COMPLETED
            )

        await self.store.save(workflow)

Monitoring Hooks

Add observability from day one. Emit structured events that can feed dashboards and alerts.

import logging
import time

logger = logging.getLogger("orchestrator")

class MonitoredOrchestrator(Orchestrator):
    async def process_step_result(self, workflow_id, step_name, result):
        start = time.monotonic()
        await super().process_step_result(workflow_id, step_name, result)
        duration = time.monotonic() - start

        logger.info(
            "step_completed",
            extra={
                "workflow_id": workflow_id,
                "step_name": step_name,
                "duration_ms": round(duration * 1000, 2),
                "result_size": len(str(result)),
            },
        )

FAQ

How do I decide between building custom and using Temporal?

Start with Temporal or another off-the-shelf tool. Build custom only if you have confirmed that the existing tool cannot meet a specific requirement — latency, routing logic, multi-tenancy, or integration constraints. Most teams overestimate the need for custom orchestration and underestimate the maintenance cost.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

What is the biggest risk with custom orchestrators?

Incomplete failure handling. Production orchestrators must handle worker crashes, partial failures, poison messages, timeout recovery, and state corruption. Off-the-shelf tools have years of hardening around these edge cases. Budget significant testing effort for failure scenarios if you build custom.

How do I migrate from a custom orchestrator to Temporal later?

Design your step handlers as pure functions that take inputs and return outputs without referencing the orchestrator directly. This makes them portable. The orchestration logic (step sequencing, retry policies, state transitions) is what changes when you migrate — the actual work functions stay the same.


#AgentOrchestration #CustomArchitecture #StateMachine #Python #SystemDesign #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like