Skip to content
Learn Agentic AI
Learn Agentic AI14 min read6 views

Webhook and Integration Layer: Connecting AI Agents to CRMs, ERPs, and Third-Party Services

Build a robust integration framework for an AI agent platform that connects agents to Salesforce, HubSpot, Slack, and other third-party services through webhooks, OAuth flows, and a configurable data mapping layer.

Integrations Are the Moat

An AI agent that cannot talk to your CRM, create tickets in your helpdesk, or send notifications to Slack is a toy. Integrations transform agents from chatbots into workflow automation engines. For an agent SaaS platform, the breadth and reliability of your integration layer is often the difference between winning and losing enterprise deals.

The design challenge is building an integration framework that is generic enough to support hundreds of services but reliable enough that a failed webhook does not silently drop a customer's data.

Integration Framework Architecture

The framework has three layers: connection management (OAuth and credentials), execution (making API calls), and data mapping (translating between your schema and the external service):

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway<br/>auth plus rate limit"]
    APP["FastAPI app<br/>handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer<br/>business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
# integration_models.py — Integration framework data model
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
from datetime import datetime
import uuid

class AuthType(str, Enum):
    OAUTH2 = "oauth2"
    API_KEY = "api_key"
    BEARER_TOKEN = "bearer_token"
    BASIC_AUTH = "basic_auth"
    WEBHOOK_SECRET = "webhook_secret"

class IntegrationProvider(BaseModel):
    id: str  # "salesforce", "hubspot", "slack"
    name: str
    auth_type: AuthType
    oauth_config: Optional[dict] = None  # authorize_url, token_url, scopes
    base_url: str
    available_actions: list[str]  # "create_contact", "send_message", etc.
    webhook_events: list[str]  # Events this provider can send to us

class TenantIntegration(BaseModel):
    id: uuid.UUID = Field(default_factory=uuid.uuid4)
    tenant_id: uuid.UUID
    provider_id: str
    status: str = "pending"  # "pending", "active", "error", "revoked"
    credentials_encrypted: bytes = b""
    refresh_token_encrypted: Optional[bytes] = None
    token_expires_at: Optional[datetime] = None
    webhook_url: Optional[str] = None  # Our endpoint for receiving their webhooks
    field_mappings: dict = {}  # Maps our fields to their fields
    created_at: datetime = Field(default_factory=datetime.utcnow)

OAuth Flow Implementation

Most enterprise integrations use OAuth2. Here is a complete flow that handles token refresh:

# oauth_service.py — OAuth2 connection management
import httpx
from cryptography.fernet import Fernet
from datetime import datetime, timedelta

class OAuthService:
    def __init__(self, encryption_key: str, db):
        self.cipher = Fernet(encryption_key.encode())
        self.db = db

    def get_authorize_url(self, provider: IntegrationProvider, tenant_id: uuid.UUID) -> str:
        state = self.cipher.encrypt(f"{tenant_id}:{provider.id}".encode()).decode()
        oauth = provider.oauth_config
        return (
            f"{oauth['authorize_url']}?"
            f"client_id={oauth['client_id']}"
            f"&redirect_uri={oauth['redirect_uri']}"
            f"&scope={'+'.join(oauth['scopes'])}"
            f"&response_type=code"
            f"&state={state}"
        )

    async def handle_callback(self, code: str, state: str) -> TenantIntegration:
        decrypted = self.cipher.decrypt(state.encode()).decode()
        tenant_id, provider_id = decrypted.split(":")
        provider = await self.get_provider(provider_id)
        oauth = provider.oauth_config

        async with httpx.AsyncClient() as client:
            resp = await client.post(oauth["token_url"], data={
                "grant_type": "authorization_code",
                "code": code,
                "client_id": oauth["client_id"],
                "client_secret": oauth["client_secret"],
                "redirect_uri": oauth["redirect_uri"],
            })
            resp.raise_for_status()
            tokens = resp.json()

        integration = TenantIntegration(
            tenant_id=uuid.UUID(tenant_id),
            provider_id=provider_id,
            status="active",
            credentials_encrypted=self.cipher.encrypt(tokens["access_token"].encode()),
            refresh_token_encrypted=self.cipher.encrypt(
                tokens.get("refresh_token", "").encode()
            ),
            token_expires_at=datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)),
        )
        await self.db.save(integration)
        return integration

    async def get_valid_token(self, integration: TenantIntegration) -> str:
        if integration.token_expires_at and integration.token_expires_at > datetime.utcnow():
            return self.cipher.decrypt(integration.credentials_encrypted).decode()

        # Token expired — refresh it
        provider = await self.get_provider(integration.provider_id)
        refresh_token = self.cipher.decrypt(integration.refresh_token_encrypted).decode()

        async with httpx.AsyncClient() as client:
            resp = await client.post(provider.oauth_config["token_url"], data={
                "grant_type": "refresh_token",
                "refresh_token": refresh_token,
                "client_id": provider.oauth_config["client_id"],
                "client_secret": provider.oauth_config["client_secret"],
            })
            resp.raise_for_status()
            tokens = resp.json()

        integration.credentials_encrypted = self.cipher.encrypt(tokens["access_token"].encode())
        integration.token_expires_at = datetime.utcnow() + timedelta(
            seconds=tokens.get("expires_in", 3600)
        )
        if "refresh_token" in tokens:
            integration.refresh_token_encrypted = self.cipher.encrypt(
                tokens["refresh_token"].encode()
            )
        await self.db.save(integration)
        return tokens["access_token"]

Webhook Receiver

Your platform needs to receive webhooks from external services. Each tenant gets a unique webhook URL with signature verification:

# webhook_receiver.py — Incoming webhook handler
import hmac
import hashlib
from fastapi import APIRouter, Request, HTTPException

router = APIRouter(prefix="/v1/webhooks")

@router.post("/incoming/{integration_id}")
async def receive_webhook(integration_id: str, request: Request):
    integration = await db.get_integration(integration_id)
    if not integration or integration.status != "active":
        raise HTTPException(status_code=404)

    body = await request.body()

    # Verify webhook signature
    provider = await get_provider(integration.provider_id)
    if not verify_signature(provider, integration, request.headers, body):
        raise HTTPException(status_code=401, detail="Invalid signature")

    payload = await request.json()

    # Route to appropriate handler
    event_type = extract_event_type(provider, request.headers, payload)
    await event_router.dispatch(
        tenant_id=integration.tenant_id,
        provider_id=integration.provider_id,
        event_type=event_type,
        payload=payload,
    )
    return {"status": "accepted"}

def verify_signature(provider, integration, headers, body: bytes) -> bool:
    if provider.auth_type == AuthType.WEBHOOK_SECRET:
        secret = cipher.decrypt(integration.credentials_encrypted)
        signature_header = headers.get("x-webhook-signature", "")
        expected = hmac.new(secret, body, hashlib.sha256).hexdigest()
        return hmac.compare_digest(signature_header, expected)
    return True  # Providers without signature verification

Data Mapping Layer

Different CRMs use different field names for the same concept. The mapping layer translates between your canonical schema and each provider:

# field_mapper.py — Configurable field mapping between systems
class FieldMapper:
    DEFAULT_MAPPINGS = {
        "salesforce": {
            "contact.email": "Email",
            "contact.first_name": "FirstName",
            "contact.last_name": "LastName",
            "contact.phone": "Phone",
            "contact.company": "Account.Name",
            "ticket.subject": "Subject",
            "ticket.description": "Description",
            "ticket.priority": "Priority",
        },
        "hubspot": {
            "contact.email": "email",
            "contact.first_name": "firstname",
            "contact.last_name": "lastname",
            "contact.phone": "phone",
            "contact.company": "company",
            "ticket.subject": "subject",
            "ticket.description": "content",
            "ticket.priority": "hs_ticket_priority",
        },
    }

    def __init__(self, provider_id: str, custom_mappings: dict = None):
        self.mappings = {**self.DEFAULT_MAPPINGS.get(provider_id, {})}
        if custom_mappings:
            self.mappings.update(custom_mappings)

    def to_external(self, canonical_data: dict) -> dict:
        result = {}
        for canonical_key, value in canonical_data.items():
            external_key = self.mappings.get(canonical_key)
            if external_key:
                self._set_nested(result, external_key, value)
        return result

    def from_external(self, external_data: dict) -> dict:
        reverse = {v: k for k, v in self.mappings.items()}
        result = {}
        for ext_key, value in self._flatten(external_data).items():
            canonical_key = reverse.get(ext_key)
            if canonical_key:
                result[canonical_key] = value
        return result

    def _set_nested(self, d: dict, key: str, value):
        keys = key.split(".")
        for k in keys[:-1]:
            d = d.setdefault(k, {})
        d[keys[-1]] = value

    def _flatten(self, d: dict, prefix="") -> dict:
        items = {}
        for k, v in d.items():
            new_key = f"{prefix}.{k}" if prefix else k
            if isinstance(v, dict):
                items.update(self._flatten(v, new_key))
            else:
                items[new_key] = v
        return items

FAQ

How do I handle webhook delivery failures from external services?

Implement idempotent webhook processing using the webhook's unique event ID. Store processed event IDs in a set (Redis or database) and skip duplicates. For your outgoing webhooks, implement exponential backoff retry with a dead-letter queue. After 5 failed attempts, mark the integration as "error" and notify the tenant.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Should I build integrations in-house or use a third-party integration platform like Merge or Unified?

Start with 3-5 in-house integrations for the services your customers use most (typically Salesforce, HubSpot, Slack, and Zendesk). This gives you full control over the experience. Evaluate third-party integration platforms once you need to support 20+ services — the abstraction layer they provide saves engineering time at scale, though it adds latency and another point of failure.

How do I let users configure field mappings without writing code?

Build a visual mapping UI with two columns — your canonical fields on the left, the external service's fields on the right. Fetch the external service's field list dynamically via their API (most CRMs have metadata endpoints). Let users draw connections between fields by clicking on pairs. Pre-populate with default mappings so most users only need to add their custom fields.


#Webhooks #Integrations #OAuth #CRM #APIDesign #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.