Building an AI Agent with Tool-Use Chains: Sequential Tool Orchestration for Complex Tasks
Learn how to build AI agents that chain multiple tools together sequentially, passing intermediate results through dependency graphs while handling errors gracefully across the entire pipeline.
Why Tool Chaining Changes Everything
Most AI agent tutorials show a single tool call: the model decides to call a function, gets a result, and responds. Real-world tasks are rarely that simple. A user who asks "find the top 3 competitors for Acme Corp and draft an outreach email for each" requires your agent to chain together a web search tool, a data extraction tool, a company analysis tool, and an email drafting tool — each depending on results from the previous step.
Tool-use chains transform agents from single-step assistants into multi-step reasoning engines. The key challenge is managing the flow of intermediate results, handling partial failures, and keeping the entire chain observable.
The Architecture of a Tool Chain
A tool chain is a directed acyclic graph (DAG) where each node is a tool invocation and edges represent data dependencies. The simplest chain is linear — tool A feeds tool B feeds tool C. More complex chains fan out and converge.
flowchart TD
USER(["User message"])
LLM["LLM call<br/>with tools schema"]
DECIDE{"Model wants<br/>to call a tool?"}
EXEC["Execute tool<br/>sandboxed runtime"]
RESULT["Append tool_result<br/>to messages"]
GUARD{"Output passes<br/>guardrails?"}
DONE(["Final reply"])
BLOCK(["Refuse and log"])
USER --> LLM --> DECIDE
DECIDE -->|Yes| EXEC --> RESULT --> LLM
DECIDE -->|No| GUARD
GUARD -->|Yes| DONE
GUARD -->|No| BLOCK
style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
style EXEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style DONE fill:#059669,stroke:#047857,color:#fff
style BLOCK fill:#dc2626,stroke:#b91c1c,color:#fff
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
import asyncio
@dataclass
class ToolNode:
name: str
fn: Callable[..., Awaitable[Any]]
depends_on: list[str] = field(default_factory=list)
result: Any = None
error: str | None = None
class ToolChain:
def __init__(self):
self.nodes: dict[str, ToolNode] = {}
def add(self, name: str, fn: Callable, depends_on: list[str] = None):
self.nodes[name] = ToolNode(
name=name, fn=fn, depends_on=depends_on or []
)
async def execute(self) -> dict[str, Any]:
completed: set[str] = set()
results: dict[str, Any] = {}
while len(completed) < len(self.nodes):
ready = [
n for n in self.nodes.values()
if n.name not in completed
and all(d in completed for d in n.depends_on)
]
if not ready:
raise RuntimeError("Circular dependency detected")
tasks = []
for node in ready:
dep_results = {d: results[d] for d in node.depends_on}
tasks.append(self._run_node(node, dep_results))
await asyncio.gather(*tasks)
for node in ready:
completed.add(node.name)
results[node.name] = node.result
return results
async def _run_node(self, node: ToolNode, deps: dict):
try:
node.result = await node.fn(deps)
except Exception as e:
node.error = str(e)
node.result = None
This executor resolves dependencies automatically, runs independent nodes in parallel, and captures errors per-node without crashing the entire chain.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
Defining Tools with Dependencies
Each tool is an async function that receives its upstream dependencies as a dictionary. Here is a practical example — researching a company and generating a report.
async def search_company(deps: dict) -> dict:
"""Step 1: Search for company information."""
# In production, call a search API
return {
"name": "Acme Corp",
"industry": "SaaS",
"revenue": "$50M",
"employees": 200,
}
async def find_competitors(deps: dict) -> list[dict]:
"""Step 2: Find competitors based on company data."""
company = deps["search_company"]
# Use company industry and size to find competitors
return [
{"name": "Beta Inc", "overlap": "high"},
{"name": "Gamma Ltd", "overlap": "medium"},
]
async def draft_emails(deps: dict) -> list[str]:
"""Step 3: Draft outreach emails for each competitor."""
competitors = deps["find_competitors"]
company = deps["search_company"]
emails = []
for comp in competitors:
emails.append(
f"Subject: Partnership with {company['name']}\n"
f"Hi {comp['name']} team..."
)
return emails
Wiring the Chain and Running It
async def main():
chain = ToolChain()
chain.add("search_company", search_company)
chain.add("find_competitors", find_competitors, depends_on=["search_company"])
chain.add("draft_emails", draft_emails, depends_on=["find_competitors", "search_company"])
results = await chain.execute()
for email in results["draft_emails"]:
print(email)
asyncio.run(main())
The chain executor sees that search_company has no dependencies and runs it first. Then find_competitors becomes ready. Finally draft_emails runs once both of its dependencies are satisfied.
Error Propagation Strategies
When a mid-chain tool fails, you have three options: fail the entire chain, skip downstream nodes, or substitute a fallback. A robust pattern is to mark failed nodes and let downstream tools decide.
async def _run_node(self, node: ToolNode, deps: dict):
# Check if any dependency failed
failed_deps = [d for d in node.depends_on if self.nodes[d].error]
if failed_deps:
node.error = f"Skipped: upstream failures in {failed_deps}"
return
try:
node.result = await node.fn(deps)
except Exception as e:
node.error = str(e)
node.result = None
This cascade-skip approach prevents wasted compute on tools that cannot succeed, while preserving partial results from branches that did complete.
Integrating with an LLM Agent Loop
The tool chain becomes powerful when the LLM itself decides which chain to invoke. You register the chain as a single meta-tool that the agent can call.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
from agents import Agent, function_tool
@function_tool
async def competitor_research(company_name: str) -> str:
"""Research a company's competitors and draft outreach emails."""
chain = ToolChain()
chain.add("search", search_company)
chain.add("competitors", find_competitors, depends_on=["search"])
chain.add("emails", draft_emails, depends_on=["competitors", "search"])
results = await chain.execute()
return str(results["emails"])
agent = Agent(
name="Research Agent",
instructions="You help users research companies and their competitors.",
tools=[competitor_research],
)
The agent sees one tool but behind it runs an entire dependency-resolved pipeline.
FAQ
How do tool chains differ from simple sequential tool calls?
Simple sequential calls execute tools one after another in a fixed order. Tool chains model explicit data dependencies, enabling parallel execution of independent branches, automatic error propagation, and dynamic reordering. A chain with five tools where two are independent can run those two simultaneously, cutting total latency.
How should I handle timeouts in long-running chains?
Wrap each node execution with asyncio.wait_for() and a per-tool timeout. When a tool times out, treat it the same as an error — mark the node as failed and let downstream skip or fallback logic handle it. Additionally, set a global timeout on the entire chain to enforce an upper bound on total execution time.
Can the LLM modify the chain dynamically at runtime?
Yes. You can give the agent a planning tool that returns a chain specification (list of tools and dependencies), then a second tool that executes that specification. This lets the LLM reason about which tools to include before committing to execution.
#ToolChaining #AgenticAI #AIAgents #PythonAsync #Orchestration #DependencyGraphs #ToolUse #LLMTools
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.