Apache Airflow for AI Agent Scheduling: DAG-Based Workflow Management
Learn how to orchestrate AI agent workflows with Apache Airflow. Covers DAG design patterns, custom operators for LLM calls, XCom data passing, sensors, and scheduling strategies.
Airflow and AI Agents: A Natural Fit for Batch Workflows
Apache Airflow is the most widely deployed workflow orchestration platform, used by thousands of companies to schedule and monitor data pipelines. Its DAG-based model maps naturally to AI agent workflows that run on schedules — nightly report generation, periodic data analysis, scheduled content creation, and batch inference pipelines.
Airflow excels at scheduled, batch-oriented agent work. If your agent needs to run every night at 2 AM, process yesterday's data, generate a report, and email it to stakeholders, Airflow is a battle-tested choice.
Designing a DAG for an AI Agent
A DAG (Directed Acyclic Graph) defines the dependency structure of your workflow. Each node is a task, and edges define execution order.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
flowchart LR
INPUT(["User intent"])
PARSE["Parse plus<br/>classify"]
PLAN["Plan and tool<br/>selection"]
AGENT["Agent loop<br/>LLM plus tools"]
GUARD{"Guardrails<br/>and policy"}
EXEC["Execute and<br/>verify result"]
OBS[("Trace and metrics")]
OUT(["Outcome plus<br/>next action"])
INPUT --> PARSE --> PLAN --> AGENT --> GUARD
GUARD -->|Pass| EXEC --> OUT
GUARD -->|Fail| AGENT
AGENT --> OBS
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "ai-team",
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(minutes=10),
}
with DAG(
dag_id="daily_research_agent",
default_args=default_args,
description="Daily research agent that summarizes industry news",
schedule_interval="0 6 * * *", # 6 AM daily
start_date=days_ago(1),
catchup=False,
tags=["ai-agent", "research"],
) as dag:
pass # Tasks defined below
Building Tasks with the TaskFlow API
Airflow 2.x introduced the TaskFlow API, which lets you define tasks as decorated Python functions — much cleaner than the older operator-based approach.
import openai
import json
@task(retries=3, retry_delay=timedelta(seconds=30))
def gather_news(topic: str) -> list[dict]:
"""Fetch recent news articles on a topic."""
import requests
response = requests.get(
"https://newsapi.org/v2/everything",
params={
"q": topic,
"sortBy": "publishedAt",
"pageSize": 10,
"apiKey": "{{ var.value.news_api_key }}",
},
timeout=30,
)
response.raise_for_status()
articles = response.json()["articles"]
return [
{"title": a["title"], "description": a["description"]}
for a in articles
]
@task(retries=2, retry_delay=timedelta(seconds=60))
def summarize_articles(articles: list[dict]) -> str:
"""Use an LLM to summarize the collected articles."""
client = openai.OpenAI()
articles_text = "\n".join(
f"- {a['title']}: {a['description']}" for a in articles
)
response = client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "Summarize these news articles into a brief digest.",
},
{"role": "user", "content": articles_text},
],
temperature=0.3,
)
return response.choices[0].message.content
@task
def format_report(summary: str, topic: str) -> str:
"""Format the summary as an HTML email report."""
return f"""
<h2>Daily {topic} Digest</h2>
<p>{summary}</p>
<hr>
<small>Generated by AI Research Agent</small>
"""
@task
def send_report(report: str) -> None:
"""Send the report via email."""
from airflow.utils.email import send_email
send_email(
to=["[email protected]"],
subject="Daily AI Research Digest",
html_content=report,
)
Wiring the DAG
with DAG(
dag_id="daily_research_agent",
default_args=default_args,
schedule_interval="0 6 * * *",
start_date=days_ago(1),
catchup=False,
tags=["ai-agent", "research"],
) as dag:
topic = "artificial intelligence agents"
articles = gather_news(topic)
summary = summarize_articles(articles)
report = format_report(summary, topic)
send_report(report)
Data flows between tasks automatically via XComs (cross-communications). Each task's return value is serialized and stored in the Airflow metadata database, then deserialized as the input to downstream tasks.
Custom Operators for LLM Calls
For reusable LLM integration, build a custom operator:
from airflow.models import BaseOperator
class LLMOperator(BaseOperator):
def __init__(
self,
prompt_template: str,
model: str = "gpt-4",
temperature: float = 0.3,
**kwargs,
):
super().__init__(**kwargs)
self.prompt_template = prompt_template
self.model = model
self.temperature = temperature
def execute(self, context):
prompt = self.prompt_template.format(**context["params"])
client = openai.OpenAI()
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=self.temperature,
)
result = response.choices[0].message.content
self.log.info(f"LLM returned {len(result)} characters")
return result
Sensors for Event-Driven Triggers
Sensors wait for an external condition before proceeding. Use them to trigger agent workflows when new data arrives.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
from airflow.sensors.filesystem import FileSensor
wait_for_data = FileSensor(
task_id="wait_for_upload",
filepath="/data/uploads/latest.csv",
poke_interval=60,
timeout=3600,
mode="reschedule", # Free the worker slot while waiting
)
FAQ
Is Airflow suitable for real-time AI agent workflows?
Airflow is designed for batch scheduling, not real-time execution. Its minimum practical scheduling interval is about one minute, and DAG parsing adds overhead. For real-time or event-driven agent workflows, consider Temporal, Inngest, or a custom solution. Airflow works best for agents that run on a schedule.
How do I handle large XCom payloads from LLM responses?
By default, XComs are stored in the Airflow metadata database, which is not designed for large payloads. For LLM responses exceeding a few kilobytes, configure a remote XCom backend using S3, GCS, or a custom backend that stores payloads externally and passes references through XCom.
Can I run multiple agent DAGs concurrently?
Yes. Airflow's scheduler manages concurrency at the DAG level, task level, and pool level. Use the max_active_runs parameter on the DAG to control how many instances run simultaneously, and use Airflow pools to limit concurrent LLM API calls across all DAGs.
#ApacheAirflow #DAG #WorkflowScheduling #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.