Skip to content
Learn Agentic AI
Learn Agentic AI11 min read10 views

DAG-Based Agent Workflows: Directed Acyclic Graphs for Complex Task Orchestration

Learn how to model complex agent workflows as directed acyclic graphs with dependency resolution, parallel execution of independent tasks, and topological sorting for correct execution order.

Why DAGs Matter for Agent Workflows

When an AI agent must complete a complex task — say, generating a market analysis report — it faces dozens of sub-tasks with intricate dependencies. Fetching competitor data must happen before the comparison analysis. Sentiment analysis can run in parallel with financial data retrieval. The final summary depends on all preceding steps.

A directed acyclic graph (DAG) is the natural data structure for this problem. Each node represents a task, each edge represents a dependency, and the acyclic constraint guarantees no circular dependencies that would cause infinite loops.

Modeling Tasks as a DAG

Start by defining tasks with explicit dependencies:

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable

@dataclass
class AgentTask:
    """A single unit of work in the agent workflow."""
    task_id: str
    name: str
    execute: Callable[..., Awaitable[Any]]
    dependencies: list[str] = field(default_factory=list)
    result: Any = None
    status: str = "pending"  # pending, running, completed, failed

class WorkflowDAG:
    """DAG-based workflow engine for agent tasks."""

    def __init__(self):
        self.tasks: dict[str, AgentTask] = {}

    def add_task(self, task: AgentTask):
        self.tasks[task.task_id] = task

    def validate(self) -> bool:
        """Ensure the graph is acyclic using DFS cycle detection."""
        visited = set()
        in_stack = set()

        def dfs(task_id: str) -> bool:
            visited.add(task_id)
            in_stack.add(task_id)
            for dep_id in self.tasks[task_id].dependencies:
                if dep_id not in visited:
                    if not dfs(dep_id):
                        return False
                elif dep_id in in_stack:
                    return False  # Cycle detected
            in_stack.discard(task_id)
            return True

        for tid in self.tasks:
            if tid not in visited:
                if not dfs(tid):
                    return False
        return True

The validate method uses depth-first search to detect cycles. If any back edge is found during traversal, the graph is invalid.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Topological Sort for Execution Order

Topological sorting produces a linear ordering of tasks where every dependency appears before its dependent. This is essential for determining which tasks can run at each stage:

from collections import deque

def topological_sort(dag: WorkflowDAG) -> list[list[str]]:
    """Return tasks grouped into levels for parallel execution."""
    in_degree = {tid: 0 for tid in dag.tasks}
    for task in dag.tasks.values():
        for dep in task.dependencies:
            in_degree[task.task_id] += 1

    # Level 0: tasks with no dependencies
    queue = deque(
        [tid for tid, degree in in_degree.items() if degree == 0]
    )
    levels = []

    while queue:
        current_level = list(queue)
        levels.append(current_level)
        next_queue = deque()
        for tid in current_level:
            # Reduce in-degree for dependents
            for other_id, other_task in dag.tasks.items():
                if tid in other_task.dependencies:
                    in_degree[other_id] -= 1
                    if in_degree[other_id] == 0:
                        next_queue.append(other_id)
        queue = next_queue

    return levels

Each level in the output contains tasks that can execute simultaneously because all their dependencies are satisfied by prior levels.

Parallel Execution Engine

With levels computed, executing the DAG becomes straightforward with asyncio:

import asyncio

async def execute_dag(dag: WorkflowDAG):
    """Execute all tasks respecting dependencies, parallelizing where possible."""
    if not dag.validate():
        raise ValueError("Workflow contains cycles")

    levels = topological_sort(dag)
    results = {}

    for level in levels:
        # Run all tasks in this level concurrently
        coros = []
        for task_id in level:
            task = dag.tasks[task_id]
            dep_results = {
                d: results[d] for d in task.dependencies
            }
            coros.append(run_task(task, dep_results, results))
        await asyncio.gather(*coros)

async def run_task(
    task: AgentTask,
    dep_results: dict,
    all_results: dict,
):
    task.status = "running"
    try:
        task.result = await task.execute(dep_results)
        task.status = "completed"
        all_results[task.task_id] = task.result
    except Exception as e:
        task.status = "failed"
        raise RuntimeError(f"Task {task.task_id} failed: {e}")

Practical Example: Research Report Pipeline

Here is a complete pipeline that uses the DAG engine to generate a research report:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

async def fetch_market_data(deps):
    return {"revenue": 1_200_000, "growth": 0.15}

async def fetch_competitor_data(deps):
    return [{"name": "CompA", "share": 0.3}]

async def analyze_trends(deps):
    market = deps["market_data"]
    return f"Growth rate: {market['growth'] * 100}%"

async def generate_report(deps):
    trends = deps["trend_analysis"]
    competitors = deps["competitor_data"]
    return f"Report based on {trends} and {len(competitors)} competitors"

# Build the DAG
dag = WorkflowDAG()
dag.add_task(AgentTask("market_data", "Fetch Market Data", fetch_market_data))
dag.add_task(AgentTask("competitor_data", "Fetch Competitors", fetch_competitor_data))
dag.add_task(AgentTask(
    "trend_analysis", "Analyze Trends", analyze_trends,
    dependencies=["market_data"],
))
dag.add_task(AgentTask(
    "report", "Generate Report", generate_report,
    dependencies=["trend_analysis", "competitor_data"],
))

asyncio.run(execute_dag(dag))

In this example, market_data and competitor_data run in parallel (level 0). trend_analysis runs next (level 1), and the final report runs last (level 2).

FAQ

When should I use a DAG workflow instead of a simple sequential pipeline?

Use a DAG when your workflow has tasks with independent branches that can benefit from parallel execution, or when the dependency structure is complex enough that a linear sequence would either waste time waiting or execute things in the wrong order. For simple three-step pipelines, sequential execution is fine.

How do I handle failures in a DAG where downstream tasks depend on the failed task?

Implement a failure propagation strategy. When a task fails, mark all its transitive dependents as "skipped" rather than attempting them. You can also add retry logic at the task level, and only propagate the failure after retries are exhausted. The key is to never run a task whose dependencies have not all completed successfully.

Can I dynamically add tasks to a DAG during execution?

Yes, but it requires careful design. After a task completes, it can register new tasks into the DAG as long as they do not create cycles. Re-run the topological sort for remaining tasks and continue execution. This pattern is common when an agent discovers sub-tasks it could not predict upfront.


#DAG #WorkflowOrchestration #ParallelExecution #AgenticAI #Python #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.