Skip to content
Learn Agentic AI
Learn Agentic AI15 min read10 views

Event-Driven Microservices for AI Agents: Kafka, RabbitMQ, and NATS Patterns

Implement event-driven communication between AI agent microservices using Kafka, RabbitMQ, and NATS. Learn event schema design, pub/sub patterns, event sourcing, and exactly-once delivery semantics.

Why Event-Driven Architecture Fits AI Agent Systems

AI agent workflows are inherently asynchronous. A user sends a message, the agent reasons over it, calls tools, retrieves context from a vector store, and eventually returns a response. Many of these steps can happen independently. The memory service needs to record the conversation after the response is sent. The analytics service needs to log latency metrics. The billing service needs to track token usage.

If all of these happen synchronously in the request path, response latency balloons. Event-driven architecture decouples the request path from downstream processing. The conversation service publishes events, and other services consume them independently.

Designing Event Schemas

A well-designed event schema is the contract between services. It must be self-describing, versioned, and contain enough context for any consumer to act without making additional API calls:

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field, asdict
from datetime import datetime
import uuid
import json

@dataclass
class AgentEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = ""
    version: str = "1.0"
    timestamp: str = field(
        default_factory=lambda: datetime.utcnow().isoformat()
    )
    source_service: str = ""
    correlation_id: str = ""
    payload: dict = field(default_factory=dict)

    def to_json(self) -> str:
        return json.dumps(asdict(self))

# Example events published by the conversation service
def create_message_received_event(
    session_id: str, user_msg: str, correlation_id: str
) -> AgentEvent:
    return AgentEvent(
        event_type="agent.message.received",
        source_service="conversation-manager",
        correlation_id=correlation_id,
        payload={
            "session_id": session_id,
            "message": user_msg,
            "message_type": "user",
        },
    )

def create_response_generated_event(
    session_id: str,
    response: str,
    tokens_used: int,
    model: str,
    correlation_id: str,
) -> AgentEvent:
    return AgentEvent(
        event_type="agent.response.generated",
        source_service="conversation-manager",
        correlation_id=correlation_id,
        payload={
            "session_id": session_id,
            "response_length": len(response),
            "tokens_used": tokens_used,
            "model": model,
        },
    )

The correlation_id ties all events from a single user request together across services, which is essential for distributed tracing.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Kafka for High-Throughput Agent Event Streams

Kafka excels when you need durable, ordered event streams at high throughput. Agent systems that process thousands of messages per minute benefit from Kafka's partitioned log architecture:

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import asyncio

# Producer in the conversation service
class AgentEventProducer:
    def __init__(self, bootstrap_servers: str = "kafka:9092"):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: v.encode("utf-8"),
            acks="all",  # Wait for all replicas to acknowledge
        )

    async def start(self):
        await self.producer.start()

    async def publish(self, event: AgentEvent):
        topic = event.event_type.replace(".", "-")
        await self.producer.send_and_wait(
            topic=topic,
            value=event.to_json(),
            key=event.correlation_id.encode("utf-8"),
        )

# Consumer in the analytics service
class AnalyticsConsumer:
    def __init__(self):
        self.consumer = AIOKafkaConsumer(
            "agent-response-generated",
            bootstrap_servers="kafka:9092",
            group_id="analytics-service",
            auto_offset_reset="earliest",
            enable_auto_commit=False,
        )

    async def consume(self):
        await self.consumer.start()
        try:
            async for msg in self.consumer:
                event = json.loads(msg.value.decode("utf-8"))
                await self.process_event(event)
                await self.consumer.commit()
        finally:
            await self.consumer.stop()

    async def process_event(self, event: dict):
        payload = event["payload"]
        await self.db.insert_metric(
            session_id=payload["session_id"],
            tokens_used=payload["tokens_used"],
            model=payload["model"],
            timestamp=event["timestamp"],
        )

Setting acks="all" ensures the event is durably written before the producer considers it sent. The consumer uses manual commit (enable_auto_commit=False) to guarantee at-least-once processing.

NATS for Lightweight Agent Communication

NATS is a strong choice for agent systems that need low-latency pub/sub without Kafka's operational complexity:

import nats

async def nats_publisher():
    nc = await nats.connect("nats://nats:4222")
    event = create_message_received_event(
        session_id="sess-123",
        user_msg="What is my account balance?",
        correlation_id="req-abc",
    )
    await nc.publish(
        "agent.message.received",
        event.to_json().encode(),
    )
    await nc.flush()
    await nc.close()

async def nats_subscriber():
    nc = await nats.connect("nats://nats:4222")
    sub = await nc.subscribe("agent.>")  # Wildcard subscription

    async for msg in sub.messages:
        event = json.loads(msg.data.decode())
        print(f"Received {event['event_type']} "
              f"from {event['source_service']}")

NATS uses subject-based addressing with wildcards. The pattern agent.> subscribes to all events under the agent namespace, making it easy to build monitoring dashboards.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Exactly-Once Semantics

True exactly-once delivery is achievable through idempotent consumers. Store the event_id in a processed-events table and check it before processing:

async def process_event_exactly_once(self, event: dict):
    event_id = event["event_id"]
    if await self.db.event_already_processed(event_id):
        return  # Skip duplicate
    await self.handle(event)
    await self.db.mark_event_processed(event_id)

FAQ

When should I choose Kafka over NATS for an agent system?

Choose Kafka when you need durable event storage for replay, strict ordering within partitions, and high throughput at scale (thousands of events per second). Choose NATS when you need simple pub/sub with low latency, the event volume is moderate, and you want minimal operational overhead. For most agent systems under 500 requests per minute, NATS is simpler to operate.

How do I handle schema evolution when event formats change?

Include a version field in every event. When the schema changes, increment the version. Consumers should handle multiple versions by checking the version field and applying the appropriate deserialization logic. Avoid breaking changes — add new fields rather than renaming or removing existing ones.

Should every microservice publish events, or just the core conversation service?

Every service that performs a meaningful state change should publish events. The tool execution service should publish tool.execution.completed events. The RAG service should publish rag.retrieval.completed events. This gives downstream services full visibility into the agent's behavior without coupling them to the conversation service.


#EventDriven #Kafka #RabbitMQ #NATS #Microservices #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.