Skip to content
Learn Agentic AI
Learn Agentic AI13 min read9 views

Inngest for AI Agent Functions: Event-Driven Serverless Agent Workflows

Learn how to build event-driven AI agent workflows with Inngest. Covers event triggers, step functions, automatic retries, fan-out patterns, and rate limiting for production agent systems.

Why Inngest for AI Agent Workflows

Inngest takes a unique approach to workflow orchestration: event-driven, serverless, and step-based. Instead of managing workers, queues, and schedulers, you define functions that respond to events. Each function is composed of steps — individually retryable, checkpointed units of work that Inngest manages automatically.

This model is particularly well-suited for AI agents because it eliminates the infrastructure overhead while providing the durability guarantees that long-running LLM workflows need. You write your agent logic as a series of steps, deploy it to any Python server, and Inngest handles retries, concurrency, rate limiting, and fan-out.

Setting Up Inngest with Python

pip install inngest

Initialize the Inngest client and create your first function:

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
import inngest
import httpx

# Initialize the client
client = inngest.Inngest(
    app_id="ai-agent-platform",
    event_key="your-event-key",
)

Defining Step Functions

Inngest functions are composed of steps. Each step is independently retryable — if step 3 fails, Inngest retries only step 3, not the entire function.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
@client.create_function(
    fn_id="research-agent",
    trigger=inngest.TriggerEvent(event="agent/research.requested"),
    retries=3,
)
async def research_agent(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    query = ctx.event.data["query"]
    user_id = ctx.event.data["user_id"]

    # Step 1: Plan the research
    plan = await step.run(
        "plan-research",
        lambda: call_planning_llm(query),
    )

    # Step 2: Gather sources
    sources = await step.run(
        "gather-sources",
        lambda: search_knowledge_base(plan["search_queries"]),
    )

    # Step 3: Synthesize answer
    answer = await step.run(
        "synthesize",
        lambda: call_synthesis_llm(query, sources),
    )

    # Step 4: Store result
    await step.run(
        "store-result",
        lambda: save_to_database(user_id, query, answer),
    )

    return {"answer": answer, "source_count": len(sources)}

async def call_planning_llm(query: str) -> dict:
    async with httpx.AsyncClient(timeout=60) as http:
        response = await http.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={
                "model": "gpt-4",
                "messages": [
                    {
                        "role": "system",
                        "content": "Generate 3 search queries for research.",
                    },
                    {"role": "user", "content": query},
                ],
                "response_format": {"type": "json_object"},
            },
        )
        return response.json()["choices"][0]["message"]["content"]

Fan-Out Patterns

Fan-out lets you execute multiple sub-tasks in parallel, then collect results. This is ideal for agents that need to process multiple data sources simultaneously.

@client.create_function(
    fn_id="multi-source-agent",
    trigger=inngest.TriggerEvent(event="agent/multi-source.requested"),
)
async def multi_source_agent(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    sources = ctx.event.data["sources"]

    # Fan out: send an event for each source
    events = [
        inngest.Event(
            name="agent/source.process",
            data={"source": source, "parent_id": ctx.event.id},
        )
        for source in sources
    ]
    await step.send_event("fan-out-sources", events)

    # Wait for all sub-tasks to complete
    results = await step.wait_for_event(
        "collect-results",
        event="agent/source.completed",
        match="data.parent_id",
        timeout="10m",
    )

    # Synthesize all results
    synthesis = await step.run(
        "synthesize-all",
        lambda: synthesize_sources(results),
    )
    return {"synthesis": synthesis}

Rate Limiting and Concurrency Control

AI agents often interact with rate-limited APIs. Inngest provides built-in rate limiting and concurrency controls.

@client.create_function(
    fn_id="rate-limited-agent",
    trigger=inngest.TriggerEvent(event="agent/process.requested"),
    rate_limit=inngest.RateLimit(
        limit=10,
        period="1m",  # Max 10 executions per minute
    ),
    concurrency=[
        inngest.Concurrency(
            limit=5,  # Max 5 concurrent executions
            scope="environment",
        ),
    ],
    throttle=inngest.Throttle(
        limit=100,
        period="1h",
        burst=20,
    ),
)
async def rate_limited_agent(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    result = await step.run(
        "call-llm",
        lambda: call_llm(ctx.event.data["prompt"]),
    )
    return {"result": result}

Triggering Events

Send events to trigger agent functions from anywhere:

# From your API endpoint
async def handle_request(query: str, user_id: str):
    await client.send(
        inngest.Event(
            name="agent/research.requested",
            data={
                "query": query,
                "user_id": user_id,
                "priority": "high",
            },
        )
    )
    return {"status": "processing"}

Serving with FastAPI

from fastapi import FastAPI
import inngest.fast_api

app = FastAPI()

inngest.fast_api.serve(
    app,
    client,
    [research_agent, multi_source_agent, rate_limited_agent],
)

Inngest connects to your server, discovers your functions, and manages execution. You deploy your code as a normal web server — no separate worker processes needed.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Scheduled Agent Runs

@client.create_function(
    fn_id="daily-digest-agent",
    trigger=inngest.TriggerCron(cron="0 8 * * *"),  # 8 AM daily
)
async def daily_digest(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    news = await step.run("fetch-news", fetch_latest_news)
    digest = await step.run("generate-digest", lambda: summarize(news))
    await step.run("send-digest", lambda: send_email(digest))
    return {"status": "sent"}

FAQ

How does Inngest differ from a traditional message queue like RabbitMQ?

Inngest is a higher-level abstraction. With RabbitMQ, you manage queues, consumers, acknowledgments, dead-letter routing, and retry logic yourself. Inngest handles all of that automatically. You define functions with steps, and Inngest manages the execution lifecycle including retries, concurrency, rate limiting, and observability.

What happens if my server goes down during a function execution?

Inngest checkpoints after each completed step. When your server comes back online, Inngest resumes the function from the last completed step. You do not lose progress, and completed steps are not re-executed.

Can I use Inngest with my existing FastAPI or Flask application?

Yes. Inngest provides middleware for FastAPI, Flask, and Django. You add the middleware to your existing app and define functions in the same codebase. No separate worker deployment needed — Inngest calls your server to execute each step.


#Inngest #EventDriven #Serverless #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Strategy

AI Agent M&A Activity 2026: Aircall–Vogent, Meta–PlayAI, OpenAI's Six Deals

Q1 2026 saw a record acquisition wave: Aircall bought Vogent (May), Meta acquired Manus and PlayAI, OpenAI closed six deals. The voice AI consolidation phase has begun.

Agentic AI

LangGraph Checkpointers in Production: Durable, Resumable Agents with Eval Replay

Use LangGraph's checkpointer to make agents resumable across crashes and human-in-the-loop pauses, then replay any checkpoint into your eval pipeline.

Agentic AI

LangGraph State-Machine Architecture: A Principal-Engineer Deep Dive (2026)

How LangGraph's StateGraph, channels, and reducers actually work — with a working multi-step agent, eval hooks at every node, and the patterns that survive production.

Agentic AI

Multi-Agent Handoffs with the OpenAI Agents SDK: The Pattern That Actually Scales (2026)

Handoffs done right — when one agent should hand control to another, how to preserve context, and how to evaluate the handoff decision itself.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

LangGraph Supervisor Pattern: Orchestrating Multi-Agent Teams in 2026

The supervisor pattern in LangGraph for coordinating specialist agents, with full code, an eval pipeline that scores routing accuracy, and the failure modes to watch for.