Skip to content
Learn Agentic AI
Learn Agentic AI14 min read20 views

Hierarchical Task Networks for AI Agents: Planning Complex Multi-Step Operations

Master Hierarchical Task Network (HTN) planning for AI agents including task decomposition, method selection, plan refinement, and execution monitoring with complete Python implementations.

What Are Hierarchical Task Networks?

When you ask an AI agent to "deploy a microservice," that instruction conceals dozens of subtasks: pull the latest code, run tests, build a container, push to a registry, update Kubernetes manifests, apply the deployment, verify health checks, and notify the team. An agent that tries to plan all of this at once will either miss steps or get lost in details.

Hierarchical Task Networks (HTN) solve this by organizing tasks into a hierarchy. High-level abstract tasks decompose into lower-level subtasks through predefined methods, continuing recursively until you reach primitive actions the agent can execute directly. HTN planning has been used in game AI, military logistics, and industrial automation for decades — and it maps perfectly onto agentic AI systems.

HTN Core Components

An HTN planner has four building blocks:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart TD
    INPUT(["Task input"])
    SUPER["Supervisor agent<br/>plans plus monitors"]
    W1["Worker 1<br/>research"]
    W2["Worker 2<br/>code"]
    W3["Worker 3<br/>writing"]
    CRITIC{"Output meets<br/>rubric?"}
    REWORK["Rework or<br/>retry path"]
    SHARED[("Shared scratchpad<br/>and memory")]
    OUT(["Final result"])
    INPUT --> SUPER
    SUPER --> W1 --> CRITIC
    SUPER --> W2 --> CRITIC
    SUPER --> W3 --> CRITIC
    W1 --> SHARED
    W2 --> SHARED
    W3 --> SHARED
    SHARED --> SUPER
    CRITIC -->|Pass| OUT
    CRITIC -->|Fail| REWORK --> SUPER
    style SUPER fill:#4f46e5,stroke:#4338ca,color:#fff
    style CRITIC fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OUT fill:#059669,stroke:#047857,color:#fff
    style SHARED fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
  1. Primitive tasks — Actions the agent can execute directly
  2. Compound tasks — Abstract tasks that must be decomposed
  3. Methods — Recipes for decomposing a compound task into subtasks
  4. World state — The current state of the environment, used to select which method applies
from dataclasses import dataclass, field
from typing import List, Callable, Dict, Any, Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    name: str
    is_primitive: bool = False
    parameters: Dict[str, Any] = field(default_factory=dict)
    status: TaskStatus = TaskStatus.PENDING

@dataclass
class Method:
    """A recipe for decomposing a compound task into subtasks."""
    name: str
    target_task: str  # Name of the compound task this method decomposes
    precondition: Callable[[Dict], bool]  # When this method applies
    subtasks: Callable[[Dict, Dict], List[Task]]  # Generate subtasks

@dataclass
class WorldState:
    facts: Dict[str, Any] = field(default_factory=dict)

    def check(self, key: str, expected: Any = True) -> bool:
        return self.facts.get(key) == expected

    def update(self, key: str, value: Any):
        self.facts[key] = value

Building the HTN Planner

The planner recursively decomposes compound tasks until only primitive tasks remain.

class HTNPlanner:
    def __init__(self):
        self.methods: Dict[str, List[Method]] = {}

    def register_method(self, method: Method):
        if method.target_task not in self.methods:
            self.methods[method.target_task] = []
        self.methods[method.target_task].append(method)

    def plan(
        self, tasks: List[Task], state: WorldState
    ) -> Optional[List[Task]]:
        plan = []
        for task in tasks:
            result = self._decompose(task, state)
            if result is None:
                return None  # Planning failed
            plan.extend(result)
        return plan

    def _decompose(
        self, task: Task, state: WorldState
    ) -> Optional[List[Task]]:
        if task.is_primitive:
            return [task]

        methods = self.methods.get(task.name, [])
        for method in methods:
            if method.precondition(state.facts):
                subtasks = method.subtasks(task.parameters, state.facts)
                result = self.plan(subtasks, state)
                if result is not None:
                    return result

        return None  # No applicable method found

Defining a Domain: Microservice Deployment

Let us define an HTN domain for deploying a microservice.

planner = HTNPlanner()

# Method 1: Deploy with Docker (when containerized)
planner.register_method(Method(
    name="deploy_containerized",
    target_task="deploy_service",
    precondition=lambda s: s.get("containerized", False),
    subtasks=lambda params, state: [
        Task("run_tests", is_primitive=True, parameters=params),
        Task("build_container", is_primitive=True, parameters=params),
        Task("push_to_registry", is_primitive=True, parameters=params),
        Task("apply_k8s_manifest", is_primitive=True, parameters=params),
        Task("verify_health", is_primitive=True, parameters=params),
        Task("notify_team", is_primitive=True, parameters=params),
    ],
))

# Method 2: Deploy as binary (when not containerized)
planner.register_method(Method(
    name="deploy_binary",
    target_task="deploy_service",
    precondition=lambda s: not s.get("containerized", False),
    subtasks=lambda params, state: [
        Task("run_tests", is_primitive=True, parameters=params),
        Task("build_binary", is_primitive=True, parameters=params),
        Task("upload_to_server", is_primitive=True, parameters=params),
        Task("restart_process", is_primitive=True, parameters=params),
        Task("verify_health", is_primitive=True, parameters=params),
        Task("notify_team", is_primitive=True, parameters=params),
    ],
))

# Plan for a containerized deployment
state = WorldState(facts={"containerized": True, "has_tests": True})
root_task = Task("deploy_service", parameters={"service": "user-api"})
plan = planner.plan([root_task], state)

for i, task in enumerate(plan):
    print(f"Step {i+1}: {task.name} ({task.parameters})")

Execution Monitor

Planning is only half the problem. The execution monitor runs the plan, handles failures, and triggers re-planning when the world state changes unexpectedly.

import asyncio

class ExecutionMonitor:
    def __init__(self, planner: HTNPlanner):
        self.planner = planner
        self.executors: Dict[str, Callable] = {}

    def register_executor(self, task_name: str, executor: Callable):
        self.executors[task_name] = executor

    async def execute_plan(
        self, plan: List[Task], state: WorldState
    ) -> bool:
        for task in plan:
            task.status = TaskStatus.RUNNING
            executor = self.executors.get(task.name)
            if not executor:
                print(f"No executor for {task.name}")
                task.status = TaskStatus.FAILED
                return False

            try:
                result = await executor(task.parameters, state)
                if result:
                    task.status = TaskStatus.COMPLETED
                    state.update(f"{task.name}_done", True)
                else:
                    task.status = TaskStatus.FAILED
                    return await self._handle_failure(task, plan, state)
            except Exception as e:
                print(f"Task {task.name} raised: {e}")
                task.status = TaskStatus.FAILED
                return await self._handle_failure(task, plan, state)

        return True

    async def _handle_failure(
        self, failed_task: Task, plan: List[Task], state: WorldState
    ) -> bool:
        state.update(f"{failed_task.name}_failed", True)
        remaining = [t for t in plan if t.status == TaskStatus.PENDING]
        if not remaining:
            return False
        # Attempt re-planning for remaining tasks
        new_plan = self.planner.plan(remaining, state)
        if new_plan:
            return await self.execute_plan(new_plan, state)
        return False

Dynamic Plan Refinement

The power of HTN planning is that methods can be added or modified at runtime. An LLM can generate new methods based on novel situations, expanding the planner's capabilities without code changes.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

FAQ

How is HTN planning different from simple step-by-step prompting?

Step-by-step prompting asks an LLM to generate all steps at once, with no formal structure for preconditions, method selection, or failure recovery. HTN planning uses a formal decomposition hierarchy where method selection is driven by world state, enabling principled replanning when steps fail and deterministic behavior for known domains.

Can I combine HTN planning with LLM-based agents?

Absolutely. The best approach is to use HTN planning for the known, structured parts of a workflow and delegate to LLM agents for the creative or uncertain subtasks. For example, the "run_tests" primitive might be a deterministic script, while "generate_test_cases" could be an LLM-powered compound task with its own methods.

What happens when no method's preconditions match?

The planner returns None, indicating planning failure. Your system should handle this by either relaxing preconditions, asking a human for guidance, or falling back to an LLM agent to invent a novel decomposition for the task.


#HTNPlanning #TaskDecomposition #AIPlanning #AgentArchitecture #MultiAgentSystems #AgenticAI #PythonAI #AutonomousAgents

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.