Skip to content
Learn Agentic AI
Learn Agentic AI14 min read22 views

Building an AI Agent with Tool-Use Chains: Sequential Tool Orchestration for Complex Tasks

Learn how to build AI agents that chain multiple tools together sequentially, passing intermediate results through dependency graphs while handling errors gracefully across the entire pipeline.

Why Tool Chaining Changes Everything

Most AI agent tutorials show a single tool call: the model decides to call a function, gets a result, and responds. Real-world tasks are rarely that simple. A user who asks "find the top 3 competitors for Acme Corp and draft an outreach email for each" requires your agent to chain together a web search tool, a data extraction tool, a company analysis tool, and an email drafting tool — each depending on results from the previous step.

Tool-use chains transform agents from single-step assistants into multi-step reasoning engines. The key challenge is managing the flow of intermediate results, handling partial failures, and keeping the entire chain observable.

The Architecture of a Tool Chain

A tool chain is a directed acyclic graph (DAG) where each node is a tool invocation and edges represent data dependencies. The simplest chain is linear — tool A feeds tool B feeds tool C. More complex chains fan out and converge.

flowchart TD
    USER(["User message"])
    LLM["LLM call<br/>with tools schema"]
    DECIDE{"Model wants<br/>to call a tool?"}
    EXEC["Execute tool<br/>sandboxed runtime"]
    RESULT["Append tool_result<br/>to messages"]
    GUARD{"Output passes<br/>guardrails?"}
    DONE(["Final reply"])
    BLOCK(["Refuse and log"])
    USER --> LLM --> DECIDE
    DECIDE -->|Yes| EXEC --> RESULT --> LLM
    DECIDE -->|No| GUARD
    GUARD -->|Yes| DONE
    GUARD -->|No| BLOCK
    style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
    style EXEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DONE fill:#059669,stroke:#047857,color:#fff
    style BLOCK fill:#dc2626,stroke:#b91c1c,color:#fff
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
import asyncio

@dataclass
class ToolNode:
    name: str
    fn: Callable[..., Awaitable[Any]]
    depends_on: list[str] = field(default_factory=list)
    result: Any = None
    error: str | None = None

class ToolChain:
    def __init__(self):
        self.nodes: dict[str, ToolNode] = {}

    def add(self, name: str, fn: Callable, depends_on: list[str] = None):
        self.nodes[name] = ToolNode(
            name=name, fn=fn, depends_on=depends_on or []
        )

    async def execute(self) -> dict[str, Any]:
        completed: set[str] = set()
        results: dict[str, Any] = {}

        while len(completed) < len(self.nodes):
            ready = [
                n for n in self.nodes.values()
                if n.name not in completed
                and all(d in completed for d in n.depends_on)
            ]
            if not ready:
                raise RuntimeError("Circular dependency detected")

            tasks = []
            for node in ready:
                dep_results = {d: results[d] for d in node.depends_on}
                tasks.append(self._run_node(node, dep_results))

            await asyncio.gather(*tasks)

            for node in ready:
                completed.add(node.name)
                results[node.name] = node.result

        return results

    async def _run_node(self, node: ToolNode, deps: dict):
        try:
            node.result = await node.fn(deps)
        except Exception as e:
            node.error = str(e)
            node.result = None

This executor resolves dependencies automatically, runs independent nodes in parallel, and captures errors per-node without crashing the entire chain.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Defining Tools with Dependencies

Each tool is an async function that receives its upstream dependencies as a dictionary. Here is a practical example — researching a company and generating a report.

async def search_company(deps: dict) -> dict:
    """Step 1: Search for company information."""
    # In production, call a search API
    return {
        "name": "Acme Corp",
        "industry": "SaaS",
        "revenue": "$50M",
        "employees": 200,
    }

async def find_competitors(deps: dict) -> list[dict]:
    """Step 2: Find competitors based on company data."""
    company = deps["search_company"]
    # Use company industry and size to find competitors
    return [
        {"name": "Beta Inc", "overlap": "high"},
        {"name": "Gamma Ltd", "overlap": "medium"},
    ]

async def draft_emails(deps: dict) -> list[str]:
    """Step 3: Draft outreach emails for each competitor."""
    competitors = deps["find_competitors"]
    company = deps["search_company"]
    emails = []
    for comp in competitors:
        emails.append(
            f"Subject: Partnership with {company['name']}\n"
            f"Hi {comp['name']} team..."
        )
    return emails

Wiring the Chain and Running It

async def main():
    chain = ToolChain()
    chain.add("search_company", search_company)
    chain.add("find_competitors", find_competitors, depends_on=["search_company"])
    chain.add("draft_emails", draft_emails, depends_on=["find_competitors", "search_company"])

    results = await chain.execute()

    for email in results["draft_emails"]:
        print(email)

asyncio.run(main())

The chain executor sees that search_company has no dependencies and runs it first. Then find_competitors becomes ready. Finally draft_emails runs once both of its dependencies are satisfied.

Error Propagation Strategies

When a mid-chain tool fails, you have three options: fail the entire chain, skip downstream nodes, or substitute a fallback. A robust pattern is to mark failed nodes and let downstream tools decide.

async def _run_node(self, node: ToolNode, deps: dict):
    # Check if any dependency failed
    failed_deps = [d for d in node.depends_on if self.nodes[d].error]
    if failed_deps:
        node.error = f"Skipped: upstream failures in {failed_deps}"
        return

    try:
        node.result = await node.fn(deps)
    except Exception as e:
        node.error = str(e)
        node.result = None

This cascade-skip approach prevents wasted compute on tools that cannot succeed, while preserving partial results from branches that did complete.

Integrating with an LLM Agent Loop

The tool chain becomes powerful when the LLM itself decides which chain to invoke. You register the chain as a single meta-tool that the agent can call.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

from agents import Agent, function_tool

@function_tool
async def competitor_research(company_name: str) -> str:
    """Research a company's competitors and draft outreach emails."""
    chain = ToolChain()
    chain.add("search", search_company)
    chain.add("competitors", find_competitors, depends_on=["search"])
    chain.add("emails", draft_emails, depends_on=["competitors", "search"])
    results = await chain.execute()
    return str(results["emails"])

agent = Agent(
    name="Research Agent",
    instructions="You help users research companies and their competitors.",
    tools=[competitor_research],
)

The agent sees one tool but behind it runs an entire dependency-resolved pipeline.

FAQ

How do tool chains differ from simple sequential tool calls?

Simple sequential calls execute tools one after another in a fixed order. Tool chains model explicit data dependencies, enabling parallel execution of independent branches, automatic error propagation, and dynamic reordering. A chain with five tools where two are independent can run those two simultaneously, cutting total latency.

How should I handle timeouts in long-running chains?

Wrap each node execution with asyncio.wait_for() and a per-tool timeout. When a tool times out, treat it the same as an error — mark the node as failed and let downstream skip or fallback logic handle it. Additionally, set a global timeout on the entire chain to enforce an upper bound on total execution time.

Can the LLM modify the chain dynamically at runtime?

Yes. You can give the agent a planning tool that returns a chain specification (list of tools and dependencies), then a second tool that executes that specification. This lets the LLM reason about which tools to include before committing to execution.


#ToolChaining #AgenticAI #AIAgents #PythonAsync #Orchestration #DependencyGraphs #ToolUse #LLMTools

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like