Scheduled Agent Tasks: Cron Jobs, Recurring Analysis, and Periodic Reports
Learn how to schedule AI agent tasks with cron expressions, implement idempotent recurring analyses, prevent overlapping runs, and build periodic reporting pipelines that run reliably in production.
Why Agents Need Schedules
Not all agent work is triggered by user requests. Many valuable agent applications run on schedules: daily market analysis reports, hourly anomaly detection on server logs, weekly customer churn predictions, and monthly compliance audits. These are autonomous agents that operate on a clock rather than a prompt.
Building scheduled agent tasks correctly requires handling cron expressions, ensuring idempotency (running the same job twice produces the same result), and preventing overlapping runs when a job takes longer than the schedule interval.
APScheduler: The Python Scheduling Library
APScheduler (Advanced Python Scheduler) provides cron-like scheduling with support for multiple backends:
flowchart LR
INPUT(["User intent"])
PARSE["Parse plus<br/>classify"]
PLAN["Plan and tool<br/>selection"]
AGENT["Agent loop<br/>LLM plus tools"]
GUARD{"Guardrails<br/>and policy"}
EXEC["Execute and<br/>verify result"]
OBS[("Trace and metrics")]
OUT(["Outcome plus<br/>next action"])
INPUT --> PARSE --> PLAN --> AGENT --> GUARD
GUARD -->|Pass| EXEC --> OUT
GUARD -->|Fail| AGENT
AGENT --> OBS
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import asyncio
scheduler = AsyncIOScheduler()
async def daily_market_analysis():
"""Agent task: analyze market data and produce a report."""
print("Starting daily market analysis...")
data = await fetch_market_data()
analysis = await run_llm_analysis(data)
await store_report(analysis)
print("Market analysis complete.")
# Run every day at 6:00 AM UTC
scheduler.add_job(
daily_market_analysis,
CronTrigger(hour=6, minute=0, timezone="UTC"),
id="daily_market_analysis",
name="Daily Market Analysis",
replace_existing=True,
)
scheduler.start()
asyncio.get_event_loop().run_forever()
The replace_existing=True parameter ensures that restarting the process does not create duplicate job entries.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
Understanding Cron Expressions
Cron expressions define schedules using five fields: minute, hour, day-of-month, month, and day-of-week. Here are common patterns for agent tasks:
# Every 15 minutes — real-time monitoring agent
CronTrigger(minute="*/15")
# Every weekday at 9 AM — morning briefing agent
CronTrigger(hour=9, minute=0, day_of_week="mon-fri")
# First day of every month at midnight — monthly compliance audit
CronTrigger(day=1, hour=0, minute=0)
# Every Sunday at 11 PM — weekly churn prediction
CronTrigger(day_of_week="sun", hour=23, minute=0)
# Every 6 hours — periodic data refresh
CronTrigger(hour="*/6", minute=0)
Ensuring Idempotency
If your scheduler fires the same job twice (due to a restart, clock skew, or retry), the job must produce the same result without side effects. Use an idempotency key based on the schedule window:
from datetime import datetime, timezone
import hashlib
def get_idempotency_key(job_name: str, window: str) -> str:
"""Generate a unique key for this job's execution window."""
# window could be "2026-03-17" for daily, "2026-03-17T06" for hourly
raw = f"{job_name}:{window}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
async def idempotent_job(job_name: str, execute_fn):
"""Run a job only if it has not already completed for this window."""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
key = get_idempotency_key(job_name, today)
if await check_completed(key):
print(f"Job {job_name} already completed for {today}, skipping.")
return
try:
result = await execute_fn()
await mark_completed(key, result)
except Exception as e:
await mark_failed(key, str(e))
raise
The check_completed and mark_completed functions should use a persistent store like Redis or a database table. This ensures that even if the process crashes and restarts, the job does not re-execute for the same window.
Preventing Overlapping Runs
When a job takes longer than its schedule interval, the scheduler might fire a second instance while the first is still running. Use a distributed lock to prevent this:
import redis.asyncio as redis
class SchedulerLock:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def acquire(self, job_name: str, ttl_seconds: int = 3600) -> bool:
"""Try to acquire a lock. Returns True if successful."""
lock_key = f"agent_lock:{job_name}"
acquired = await self.redis.set(
lock_key, "locked", ex=ttl_seconds, nx=True
)
return acquired is not None
async def release(self, job_name: str):
lock_key = f"agent_lock:{job_name}"
await self.redis.delete(lock_key)
# Usage in a scheduled job
lock = SchedulerLock(redis.from_url("redis://localhost:6379/0"))
async def protected_job():
if not await lock.acquire("daily_analysis", ttl_seconds=7200):
print("Previous run still in progress, skipping.")
return
try:
await run_analysis()
finally:
await lock.release("daily_analysis")
The TTL on the lock acts as a safety valve. If the worker crashes without releasing the lock, it automatically expires after the TTL, allowing the next scheduled run to proceed.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Complete Scheduled Agent Example
Putting it all together — a production-ready scheduled agent with idempotency and overlap prevention:
async def build_weekly_report():
"""Complete scheduled agent: weekly churn analysis."""
week = datetime.now(timezone.utc).strftime("%Y-W%W")
key = get_idempotency_key("churn_report", week)
if await check_completed(key):
return
if not await lock.acquire("churn_report"):
return
try:
customers = await fetch_customer_metrics()
churn_risks = await analyze_churn_with_llm(customers)
report = await generate_report(churn_risks)
await send_report_email(report, recipients=["[email protected]"])
await mark_completed(key, {"customers_analyzed": len(customers)})
finally:
await lock.release("churn_report")
scheduler.add_job(
build_weekly_report,
CronTrigger(day_of_week="mon", hour=7, minute=0, timezone="UTC"),
id="weekly_churn_report",
replace_existing=True,
)
FAQ
How do I handle timezone issues with scheduled agent tasks?
Always store and schedule in UTC internally. Convert to local timezones only for display. APScheduler accepts a timezone parameter on triggers. If your report must arrive at "9 AM New York time," use CronTrigger(hour=9, timezone="America/New_York") — APScheduler handles DST transitions automatically.
What happens when a scheduled job fails? Should it retry automatically?
It depends on the job type. For idempotent jobs (like report generation), automatic retries are safe — just schedule a retry after a delay. For non-idempotent jobs (like sending notifications), log the failure and alert an operator. APScheduler supports misfire_grace_time which controls how late a misfired job can still run, and you can add retry decorators to the job function itself.
How do I monitor whether scheduled agent tasks are actually running?
Implement a heartbeat pattern. Each job writes a "last_run" timestamp to a monitoring store after completion. A separate health check compares the last_run timestamp against the expected schedule. If a daily job has not run in 25 hours, trigger an alert. Services like Cronitor or Healthchecks.io can receive pings from your jobs and alert on missed runs.
#Scheduling #CronJobs #PeriodicTasks #Idempotency #Python #AgenticAI #LearnAI #AIEngineering
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.