Skip to content
Learn Agentic AI
Learn Agentic AI11 min read8 views

Building an AI Agent Webhook Hub: Centralized Event Processing for Multiple Integrations

Design and build a centralized webhook hub that receives events from multiple services, normalizes them into a common format, routes them to AI agent processors, and ensures reliable delivery with fan-out and retry logic.

Why Build a Centralized Webhook Hub

As your AI agent integrates with more services — GitHub, Slack, Stripe, Jira — each webhook endpoint becomes its own silo with separate signature verification, payload parsing, and error handling. A centralized webhook hub normalizes all incoming events into a common format, routes them to the appropriate agent processors, and provides unified logging, retry logic, and observability.

This architectural pattern transforms a tangle of point-to-point integrations into a clean event-driven system.

Designing the Event Schema

Define a normalized event format that all incoming webhooks map to, regardless of their source.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
sequenceDiagram
    autonumber
    participant Caller as Caller
    participant Agent as CallSphere Agent
    participant API as CRM API
    participant DB as CRM Database
    participant Webhook as Webhook Listener
    Caller->>Agent: Inbound call begins
    Agent->>Agent: STT plus intent detection
    Agent->>API: Lookup contact by phone
    API->>DB: Read contact record
    DB-->>API: Contact and history
    API-->>Agent: Personalized context
    Agent->>API: Create call activity
    Agent->>API: Update deal stage
    API->>Webhook: Outbound webhook fires
    Webhook-->>Agent: Confirmed
    Agent->>Caller: Spoken confirmation
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

@dataclass
class NormalizedEvent:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    source: str = ""          # "github", "slack", "stripe"
    event_type: str = ""      # "issue.created", "message.received"
    timestamp: datetime = field(default_factory=datetime.utcnow)
    actor: str = ""           # Who triggered the event
    resource_id: str = ""     # ID of the affected resource
    resource_type: str = ""   # "pull_request", "payment", "message"
    payload: dict = field(default_factory=dict)  # Full original payload
    metadata: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "source": self.source,
            "event_type": self.event_type,
            "timestamp": self.timestamp.isoformat(),
            "actor": self.actor,
            "resource_id": self.resource_id,
            "resource_type": self.resource_type,
            "payload": self.payload,
            "metadata": self.metadata,
        }

Source-Specific Normalizers

Each integration source gets a normalizer that translates its raw webhook payload into the common event format.

from abc import ABC, abstractmethod

class EventNormalizer(ABC):
    @abstractmethod
    def verify_signature(self, payload: bytes, headers: dict) -> bool:
        pass

    @abstractmethod
    def normalize(self, raw_payload: dict, headers: dict) -> NormalizedEvent:
        pass

class GitHubNormalizer(EventNormalizer):
    def __init__(self, webhook_secret: str):
        self.secret = webhook_secret

    def verify_signature(self, payload: bytes, headers: dict) -> bool:
        import hmac, hashlib
        signature = headers.get("x-hub-signature-256", "")
        expected = "sha256=" + hmac.new(
            self.secret.encode(), payload, hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(expected, signature)

    def normalize(self, raw_payload: dict, headers: dict) -> NormalizedEvent:
        event_type = headers.get("x-github-event", "unknown")
        action = raw_payload.get("action", "")

        return NormalizedEvent(
            source="github",
            event_type=f"{event_type}.{action}" if action else event_type,
            actor=raw_payload.get("sender", {}).get("login", "unknown"),
            resource_id=str(
                raw_payload.get("pull_request", raw_payload.get("issue", {}))
                .get("number", "")
            ),
            resource_type=event_type,
            payload=raw_payload,
        )

class StripeNormalizer(EventNormalizer):
    def __init__(self, webhook_secret: str):
        self.secret = webhook_secret

    def verify_signature(self, payload: bytes, headers: dict) -> bool:
        import stripe
        try:
            stripe.Webhook.construct_event(
                payload, headers.get("stripe-signature", ""), self.secret
            )
            return True
        except stripe.error.SignatureVerificationError:
            return False

    def normalize(self, raw_payload: dict, headers: dict) -> NormalizedEvent:
        data_obj = raw_payload.get("data", {}).get("object", {})
        return NormalizedEvent(
            source="stripe",
            event_type=raw_payload.get("type", "unknown"),
            actor=data_obj.get("customer", "system"),
            resource_id=data_obj.get("id", ""),
            resource_type=raw_payload.get("type", "").split(".")[0],
            payload=raw_payload,
        )

The Webhook Router

The central router receives all webhooks, verifies signatures, normalizes events, and dispatches them.

from fastapi import FastAPI, Request, HTTPException
import logging

logger = logging.getLogger("webhook_hub")

app = FastAPI()

normalizers: dict[str, EventNormalizer] = {
    "github": GitHubNormalizer(webhook_secret="gh-secret"),
    "stripe": StripeNormalizer(webhook_secret="stripe-secret"),
}

event_handlers: dict[str, list] = {}

def register_handler(event_pattern: str, handler):
    """Register a handler for events matching a pattern."""
    if event_pattern not in event_handlers:
        event_handlers[event_pattern] = []
    event_handlers[event_pattern].append(handler)

@app.post("/webhooks/{source}")
async def receive_webhook(source: str, request: Request):
    normalizer = normalizers.get(source)
    if not normalizer:
        raise HTTPException(status_code=404, detail="Unknown source")

    body = await request.body()
    headers = dict(request.headers)

    if not normalizer.verify_signature(body, headers):
        raise HTTPException(status_code=401, detail="Invalid signature")

    raw_payload = await request.json()
    event = normalizer.normalize(raw_payload, headers)

    logger.info(
        f"Received event: {event.source}/{event.event_type} "
        f"[{event.id}]"
    )

    await dispatch_event(event)
    return {"status": "accepted", "event_id": event.id}

Fan-Out and Reliable Dispatch

Dispatch normalized events to all matching handlers with error isolation — one handler's failure should not block others.

import asyncio
from datetime import datetime

async def dispatch_event(event: NormalizedEvent):
    matching_handlers = []

    for pattern, handlers in event_handlers.items():
        if matches_pattern(event, pattern):
            matching_handlers.extend(handlers)

    if not matching_handlers:
        logger.warning(f"No handlers for {event.source}/{event.event_type}")
        return

    tasks = [
        dispatch_to_handler(handler, event)
        for handler in matching_handlers
    ]
    await asyncio.gather(*tasks, return_exceptions=True)

async def dispatch_to_handler(handler, event: NormalizedEvent,
                               max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            await handler(event)
            logger.info(
                f"Handler {handler.__name__} processed {event.id}"
            )
            return
        except Exception as e:
            wait_time = 2 ** attempt
            logger.error(
                f"Handler {handler.__name__} failed on {event.id} "
                f"(attempt {attempt + 1}): {e}"
            )
            if attempt < max_retries - 1:
                await asyncio.sleep(wait_time)

    # Store failed event for manual review
    await store_dead_letter(event, handler.__name__)

def matches_pattern(event: NormalizedEvent, pattern: str) -> bool:
    """Match event against handler pattern like 'github.*' or 'stripe.invoice.*'"""
    source_filter, type_filter = pattern.split("/", 1) if "/" in pattern else (pattern, "*")
    if source_filter != "*" and source_filter != event.source:
        return False
    if type_filter == "*":
        return True
    return event.event_type.startswith(type_filter.rstrip("*"))

Registering Agent Handlers

Connect your AI agents to the hub by registering handlers.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

# Register handlers at startup
register_handler("github/pull_request.*", handle_pr_review_agent)
register_handler("github/issues.*", handle_issue_triage_agent)
register_handler("stripe/invoice.*", handle_payment_agent)
register_handler("*/", handle_audit_logger)  # Logs all events

FAQ

How do I handle webhook delivery guarantees when the hub is temporarily down?

Most webhook senders (GitHub, Stripe, Slack) retry failed deliveries with exponential backoff for several hours. However, for maximum reliability, put a message queue (Redis Streams, RabbitMQ, or SQS) between the webhook receiver and the processing logic. The HTTP endpoint accepts and enqueues immediately, then workers process from the queue at their own pace.

How do I debug events flowing through the hub?

Add a dead letter queue for events that fail all retry attempts, and an event log table that records every received event with its normalized form and dispatch results. Include correlation IDs in all log messages so you can trace an event from ingestion through every handler. A simple SQLite or PostgreSQL table with event_id, source, type, status, and timestamp columns is sufficient for most debugging needs.

Should I process webhook events synchronously or asynchronously?

Accept the webhook and return 200 immediately, then process asynchronously. This prevents timeout errors from the sending service and decouples ingestion throughput from processing speed. If a handler takes 30 seconds (common for AI agent processing), the webhook sender would time out on a synchronous approach. Async processing with a queue gives you both reliability and performance.


#Webhooks #EventProcessing #SystemArchitecture #AIAgents #IntegrationHub #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Multi-Agent Handoffs with the OpenAI Agents SDK: The Pattern That Actually Scales (2026)

Handoffs done right — when one agent should hand control to another, how to preserve context, and how to evaluate the handoff decision itself.

AI Strategy

AI Agent M&A Activity 2026: Aircall–Vogent, Meta–PlayAI, OpenAI's Six Deals

Q1 2026 saw a record acquisition wave: Aircall bought Vogent (May), Meta acquired Manus and PlayAI, OpenAI closed six deals. The voice AI consolidation phase has begun.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

LangGraph Checkpointers in Production: Durable, Resumable Agents with Eval Replay

Use LangGraph's checkpointer to make agents resumable across crashes and human-in-the-loop pauses, then replay any checkpoint into your eval pipeline.

Agentic AI

LangGraph State-Machine Architecture: A Principal-Engineer Deep Dive (2026)

How LangGraph's StateGraph, channels, and reducers actually work — with a working multi-step agent, eval hooks at every node, and the patterns that survive production.

Agentic AI

LangGraph Supervisor Pattern: Orchestrating Multi-Agent Teams in 2026

The supervisor pattern in LangGraph for coordinating specialist agents, with full code, an eval pipeline that scores routing accuracy, and the failure modes to watch for.