Skip to content
Learn Agentic AI
Learn Agentic AI14 min read8 views

Building a KYC/AML Agent: Identity Verification and Transaction Monitoring

Learn to build an AI agent for Know Your Customer and Anti-Money Laundering that verifies identities, screens against sanctions lists, monitors transactions, and generates risk alerts.

Why KYC/AML Needs AI Agents

Know Your Customer (KYC) and Anti-Money Laundering (AML) compliance is one of the most resource-intensive requirements for financial institutions. Banks and fintechs spend billions annually on compliance teams that manually verify identities, screen customers against sanctions lists, and investigate suspicious transactions. An AI agent can automate the routine checks, reduce false positives, and ensure consistent application of risk rules — letting compliance officers focus on genuinely suspicious cases.

Agent Architecture

The KYC/AML agent has four capabilities:

flowchart LR
    REQ(["Inbound request"])
    PII["PII detection<br/>regex plus NER"]
    POL{"Policy engine<br/>OPA or rules"}
    REDACT["Redact or mask"]
    LLM["LLM call"]
    OUT["Response"]
    AUDIT[("Append only<br/>audit log")]
    BLOCK(["Block plus<br/>notify DPO"])
    REQ --> PII --> POL
    POL -->|Allow| REDACT --> LLM --> OUT --> AUDIT
    POL -->|Deny| BLOCK
    style POL fill:#4f46e5,stroke:#4338ca,color:#fff
    style AUDIT fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style BLOCK fill:#dc2626,stroke:#b91c1c,color:#fff
    style OUT fill:#059669,stroke:#047857,color:#fff
  1. Identity Verification — validate customer identity documents
  2. Sanctions Screening — check against watchlists and PEP databases
  3. Risk Scoring — compute a composite customer risk score
  4. Transaction Monitoring — detect suspicious patterns in real time

Step 1: Identity Verification

The agent validates identity documents by extracting data and cross-referencing it.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
from pydantic import BaseModel
from datetime import date
from enum import Enum

class VerificationStatus(str, Enum):
    VERIFIED = "verified"
    PENDING_REVIEW = "pending_review"
    FAILED = "failed"
    EXPIRED = "expired"

class IdentityDocument(BaseModel):
    document_type: str  # "passport", "drivers_license", "national_id"
    document_number: str
    full_name: str
    date_of_birth: date
    expiry_date: date
    issuing_country: str
    mrz_data: str | None = None  # Machine Readable Zone

class VerificationResult(BaseModel):
    status: VerificationStatus
    document_authentic: bool
    name_match: bool
    dob_match: bool
    document_expired: bool
    risk_flags: list[str]

def verify_identity(
    document: IdentityDocument,
    declared_name: str,
    declared_dob: date,
) -> VerificationResult:
    """Verify identity document against declared information."""
    flags = []

    # Check document expiry
    is_expired = document.expiry_date < date.today()
    if is_expired:
        flags.append("Document is expired")

    # Name matching with fuzzy comparison
    from difflib import SequenceMatcher

    name_similarity = SequenceMatcher(
        None,
        document.full_name.lower(),
        declared_name.lower(),
    ).ratio()
    name_match = name_similarity > 0.85
    if not name_match:
        flags.append(
            f"Name mismatch: doc='{document.full_name}' "
            f"vs declared='{declared_name}' "
            f"(similarity: {name_similarity:.0%})"
        )

    # Date of birth verification
    dob_match = document.date_of_birth == declared_dob
    if not dob_match:
        flags.append("Date of birth mismatch")

    # High-risk country check
    high_risk_countries = {"KP", "IR", "SY", "CU", "MM"}
    if document.issuing_country in high_risk_countries:
        flags.append(
            f"High-risk jurisdiction: {document.issuing_country}"
        )

    # Determine overall status
    if is_expired:
        status = VerificationStatus.EXPIRED
    elif not name_match or not dob_match:
        status = VerificationStatus.FAILED
    elif flags:
        status = VerificationStatus.PENDING_REVIEW
    else:
        status = VerificationStatus.VERIFIED

    return VerificationResult(
        status=status,
        document_authentic=True,  # Would use document AI in prod
        name_match=name_match,
        dob_match=dob_match,
        document_expired=is_expired,
        risk_flags=flags,
    )

Step 2: Sanctions Screening

Screen customers against sanctions lists (OFAC, EU, UN) and PEP (Politically Exposed Persons) databases.

from dataclasses import dataclass

@dataclass
class SanctionsHit:
    list_name: str
    matched_name: str
    match_score: float
    entity_type: str  # "individual", "entity", "vessel"
    sanctions_program: str
    listed_date: str

class ScreeningResult(BaseModel):
    screened_name: str
    total_hits: int
    hits: list[dict]
    risk_level: str  # "clear", "potential_match", "confirmed_match"

async def screen_sanctions(
    name: str,
    dob: date | None = None,
    country: str | None = None,
) -> ScreeningResult:
    """Screen a name against sanctions databases."""
    import httpx

    # Using OpenSanctions API as an example
    params = {"q": name, "limit": 10}
    if country:
        params["countries"] = country

    async with httpx.AsyncClient() as client:
        resp = await client.get(
            "https://api.opensanctions.org/match/default",
            params=params,
            headers={"Authorization": "ApiKey YOUR_KEY"},
        )
        data = resp.json()

    hits = []
    for result in data.get("results", []):
        score = result.get("score", 0)
        if score > 0.7:  # Threshold for potential match
            hits.append({
                "matched_name": result.get("caption", ""),
                "score": score,
                "datasets": result.get("datasets", []),
                "properties": result.get("properties", {}),
            })

    if not hits:
        risk = "clear"
    elif any(h["score"] > 0.95 for h in hits):
        risk = "confirmed_match"
    else:
        risk = "potential_match"

    return ScreeningResult(
        screened_name=name,
        total_hits=len(hits),
        hits=hits,
        risk_level=risk,
    )

Step 3: Customer Risk Scoring

Combine multiple risk factors into a composite score.

class CustomerRiskProfile(BaseModel):
    customer_id: str
    risk_score: int  # 0-100
    risk_level: str  # "low", "medium", "high", "critical"
    factors: list[dict]
    enhanced_due_diligence: bool
    next_review_date: date

def calculate_risk_score(
    verification: VerificationResult,
    screening: ScreeningResult,
    customer_data: dict,
) -> CustomerRiskProfile:
    """Calculate composite KYC risk score."""
    score = 0
    factors = []

    # Identity verification (0-20 points)
    if verification.status == VerificationStatus.FAILED:
        score += 20
        factors.append({"factor": "ID verification failed", "points": 20})
    elif verification.status == VerificationStatus.PENDING_REVIEW:
        score += 10
        factors.append({"factor": "ID pending review", "points": 10})

    # Sanctions screening (0-40 points)
    if screening.risk_level == "confirmed_match":
        score += 40
        factors.append({"factor": "Sanctions match", "points": 40})
    elif screening.risk_level == "potential_match":
        score += 20
        factors.append({"factor": "Potential sanctions hit", "points": 20})

    # Geographic risk (0-15 points)
    high_risk_countries = {"AF", "KP", "IR", "SY", "YE"}
    country = customer_data.get("country", "")
    if country in high_risk_countries:
        score += 15
        factors.append({"factor": f"High-risk country: {country}", "points": 15})

    # Business type risk (0-15 points)
    high_risk_businesses = {"casino", "crypto", "money_service"}
    biz = customer_data.get("business_type", "")
    if biz in high_risk_businesses:
        score += 15
        factors.append({"factor": f"High-risk business: {biz}", "points": 15})

    # Transaction volume risk (0-10 points)
    volume = customer_data.get("monthly_volume", 0)
    if volume > 100000:
        score += 10
        factors.append({"factor": "High transaction volume", "points": 10})

    # Determine risk level
    if score >= 60:
        level = "critical"
    elif score >= 40:
        level = "high"
    elif score >= 20:
        level = "medium"
    else:
        level = "low"

    # Review schedule based on risk
    from datetime import timedelta
    review_intervals = {
        "critical": 30, "high": 90, "medium": 180, "low": 365
    }
    next_review = date.today() + timedelta(
        days=review_intervals[level]
    )

    return CustomerRiskProfile(
        customer_id=customer_data.get("id", ""),
        risk_score=min(score, 100),
        risk_level=level,
        factors=factors,
        enhanced_due_diligence=score >= 40,
        next_review_date=next_review,
    )

Step 4: Transaction Monitoring

Detect suspicious transaction patterns using rule-based and ML approaches.

class SuspiciousAlert(BaseModel):
    alert_id: str
    customer_id: str
    alert_type: str
    severity: str
    description: str
    transactions: list[str]  # Transaction IDs
    recommended_action: str

def monitor_transactions(
    customer_id: str,
    transactions: list[dict],
    risk_profile: CustomerRiskProfile,
) -> list[SuspiciousAlert]:
    """Monitor transactions for suspicious patterns."""
    alerts = []
    import uuid

    # Rule 1: Structuring detection (smurfing)
    daily_totals = {}
    for txn in transactions:
        day = txn["date"]
        daily_totals.setdefault(day, []).append(txn)

    for day, day_txns in daily_totals.items():
        amounts = [t["amount"] for t in day_txns]
        if (
            len(amounts) >= 3
            and all(a < 10000 for a in amounts)
            and sum(amounts) > 10000
        ):
            alerts.append(
                SuspiciousAlert(
                    alert_id=str(uuid.uuid4()),
                    customer_id=customer_id,
                    alert_type="structuring",
                    severity="high",
                    description=(
                        f"Potential structuring: {len(amounts)} "
                        f"transactions totaling "
                        f"${sum(amounts):,.2f} on {day}"
                    ),
                    transactions=[t["id"] for t in day_txns],
                    recommended_action="File SAR if confirmed",
                )
            )

    # Rule 2: Rapid movement (funds in and out quickly)
    # Rule 3: Unusual geography
    # Additional rules would follow the same pattern

    return alerts

FAQ

How do you reduce false positives in sanctions screening?

Use a multi-pass approach. First run broad fuzzy matching, then apply filters for date of birth, nationality, and other identifying information. Weight exact field matches higher than name-only matches. Track historical false positives to build a whitelist of known safe names that closely match sanctioned entities.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

What regulations govern KYC/AML agent design?

In the US, the Bank Secrecy Act and FinCEN regulations set KYC/AML requirements. The EU has Anti-Money Laundering Directives (currently 6AMLD). The FATF provides international standards. Your agent must maintain a full audit trail, support regulatory examination, and allow human override of every automated decision.

Can the agent file Suspicious Activity Reports (SARs) automatically?

The agent can prepare SAR drafts with all required fields pre-populated, but a compliance officer must review and approve every filing. Automated filing without human review would violate regulatory requirements. The agent should queue SAR drafts for review and track approval status.


#KYC #AML #IdentityVerification #Compliance #TransactionMonitoring #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Infrastructure

HIPAA Pen-Test and Risk Assessment for AI Voice in 2026

The 2024 NPRM proposes mandatory penetration tests every 12 months and vulnerability scans every 6 months. Here is how an AI voice agent should be tested in 2026.

AI Strategy

AI Vendor Due-Diligence Checklist 2026: 6 Domains, 30+ Questions, Buyer-Side Playbook

Six-domain AI vendor diligence: financial, security, privacy, operational, legal, ethics. Plus 30+ specific questions, SOC 2 / ISO 27001 baselines, and review cadence.

AI Voice Agents

Financial Advisor Voice AI 2026: KYC Pre-Screen + Discovery Call Booked

Independent RIAs and advisors lose qualified prospects to slow callbacks. Here is how a 2026 voice agent runs a structured pre-screen, soft-AUM ask, and books a discovery call cleanly.

AI Infrastructure

Twilio Trust Hub + AI: A2P 10DLC Campaign Registration (2026)

Starting June 30 2026 every A2P 10DLC campaign needs a privacy URL and T&C URL. We walk through Trust Hub Customer Profile → Standard Brand → Campaign with AI-friendly use cases, the Authentication+ flow, and real campaign approval timelines.

AI Strategy

Enterprise CIO Guide: EU AI Act Enforcement Begins — What Agentic AI Teams Need To Know

Enterprise CIO Guide perspective on The first wave of EU AI Act enforcement landed in 2026 — here is the practical impact on agent deployments.

Technology

Connecting AI Agents to ERP Systems Without Breaking Audit Trails

ERP integration is hard; ERP integration with AI is harder. The 2026 patterns for adding agents without breaking SOX, audit, or compliance.