Skip to content
Learn Agentic AI
Learn Agentic AI13 min read5 views

Capstone: Building an AI Document Processing Pipeline with Human Review

Build a complete document processing system with automated ingestion, AI-powered extraction and classification, a human review queue for quality assurance, and structured data export.

Pipeline Architecture

Document processing is one of the highest-value applications of AI in business. Invoices, contracts, medical records, insurance claims, and tax forms all need to be ingested, classified, have key fields extracted, reviewed for accuracy, and exported to downstream systems. This capstone builds that entire pipeline.

The system has five stages: ingestion (file upload with format detection), classification (determine document type), extraction (pull structured fields from unstructured text), review (human verification with an approval queue), and export (deliver validated data to external systems via API or CSV).

Data Model

# models.py
from sqlalchemy import Column, String, Text, Float, DateTime, ForeignKey, Enum
from sqlalchemy.dialects.postgresql import UUID, JSONB
import uuid, enum

class DocStatus(str, enum.Enum):
    UPLOADED = "uploaded"
    CLASSIFIED = "classified"
    EXTRACTED = "extracted"
    IN_REVIEW = "in_review"
    APPROVED = "approved"
    REJECTED = "rejected"
    EXPORTED = "exported"

class DocumentRecord(Base):
    __tablename__ = "document_records"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    filename = Column(String(500))
    file_path = Column(String(1000))
    file_type = Column(String(20))  # pdf, image, docx
    doc_type = Column(String(100), nullable=True)  # invoice, contract, etc.
    classification_confidence = Column(Float, nullable=True)
    status = Column(Enum(DocStatus), default=DocStatus.UPLOADED)
    extracted_data = Column(JSONB, nullable=True)
    reviewer_notes = Column(Text, nullable=True)
    reviewed_by = Column(String(255), nullable=True)
    created_at = Column(DateTime, server_default="now()")
    reviewed_at = Column(DateTime, nullable=True)

class ExtractionField(Base):
    __tablename__ = "extraction_fields"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("document_records.id"))
    field_name = Column(String(100))
    extracted_value = Column(Text)
    corrected_value = Column(Text, nullable=True)  # human correction
    confidence = Column(Float)

Document Classification

After ingestion, classify each document to determine what extraction schema to apply.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
# services/classifier.py
import openai, fitz

DOCUMENT_TYPES = {
    "invoice": ["vendor_name", "invoice_number", "date", "total_amount", "line_items"],
    "contract": ["parties", "effective_date", "term_length", "key_clauses"],
    "receipt": ["merchant", "date", "total", "payment_method"],
    "medical_record": ["patient_name", "date_of_service", "diagnosis", "provider"],
}

async def classify_document(doc_id: str, db) -> str:
    doc = db.query(DocumentRecord).get(doc_id)
    text = extract_text(doc.file_path, doc.file_type)

    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"""Classify this document into one of
these types: {list(DOCUMENT_TYPES.keys())}.
Return JSON with: doc_type (string), confidence (0-1)."""},
            {"role": "user", "content": text[:3000]},
        ],
        response_format={"type": "json_object"},
    )

    result = json.loads(response.choices[0].message.content)
    doc.doc_type = result["doc_type"]
    doc.classification_confidence = result["confidence"]
    doc.status = DocStatus.CLASSIFIED
    db.commit()
    return result["doc_type"]

Field Extraction

Once classified, extract the relevant fields based on the document type schema.

# services/extractor.py
async def extract_fields(doc_id: str, db) -> dict:
    doc = db.query(DocumentRecord).get(doc_id)
    text = extract_text(doc.file_path, doc.file_type)
    schema_fields = DOCUMENT_TYPES[doc.doc_type]

    field_descriptions = ", ".join(schema_fields)
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"""Extract these fields from the document:
{field_descriptions}.
Return JSON with each field name as a key. For each field include:
value (the extracted text), confidence (0-1).
If a field is not found, set value to null and confidence to 0."""},
            {"role": "user", "content": text},
        ],
        response_format={"type": "json_object"},
    )

    extracted = json.loads(response.choices[0].message.content)
    doc.extracted_data = extracted
    doc.status = DocStatus.EXTRACTED

    # Store individual fields for granular tracking
    for field_name, field_data in extracted.items():
        ef = ExtractionField(
            document_id=doc_id,
            field_name=field_name,
            extracted_value=str(field_data.get("value", "")),
            confidence=field_data.get("confidence", 0),
        )
        db.add(ef)

    # Auto-approve if all fields have high confidence
    all_confident = all(
        f.get("confidence", 0) >= 0.95 for f in extracted.values()
    )
    if all_confident:
        doc.status = DocStatus.APPROVED
    else:
        doc.status = DocStatus.IN_REVIEW

    db.commit()
    return extracted

Human Review Queue

Documents with low-confidence extractions enter a review queue. The admin interface shows the original document alongside extracted fields, allowing reviewers to correct values.

# routes/review.py
from fastapi import APIRouter

router = APIRouter(prefix="/review")

@router.get("/queue")
async def get_review_queue(page: int = 1, per_page: int = 20, db=Depends(get_db)):
    offset = (page - 1) * per_page
    docs = db.query(DocumentRecord).filter(
        DocumentRecord.status == DocStatus.IN_REVIEW
    ).order_by(DocumentRecord.created_at).offset(offset).limit(per_page).all()
    total = db.query(DocumentRecord).filter(
        DocumentRecord.status == DocStatus.IN_REVIEW
    ).count()
    return {"documents": docs, "total": total, "page": page}

@router.post("/{doc_id}/approve")
async def approve_document(doc_id: str, body: ReviewApproval, db=Depends(get_db)):
    doc = db.query(DocumentRecord).get(doc_id)

    # Apply any corrections
    for field_name, corrected_value in body.corrections.items():
        field = db.query(ExtractionField).filter(
            ExtractionField.document_id == doc_id,
            ExtractionField.field_name == field_name,
        ).first()
        if field:
            field.corrected_value = corrected_value

    doc.status = DocStatus.APPROVED
    doc.reviewed_by = body.reviewer_email
    doc.reviewed_at = datetime.utcnow()
    doc.reviewer_notes = body.notes
    db.commit()
    return {"status": "approved"}

@router.post("/{doc_id}/reject")
async def reject_document(doc_id: str, body: ReviewRejection, db=Depends(get_db)):
    doc = db.query(DocumentRecord).get(doc_id)
    doc.status = DocStatus.REJECTED
    doc.reviewed_by = body.reviewer_email
    doc.reviewer_notes = body.reason
    doc.reviewed_at = datetime.utcnow()
    db.commit()
    return {"status": "rejected"}

Export Pipeline

Approved documents are exported to downstream systems. The export layer uses the corrected values when available, falling back to the original extraction.

# services/exporter.py
async def export_approved_documents(db) -> list:
    docs = db.query(DocumentRecord).filter(
        DocumentRecord.status == DocStatus.APPROVED
    ).all()

    exported = []
    for doc in docs:
        fields = db.query(ExtractionField).filter(
            ExtractionField.document_id == doc.id
        ).all()
        record = {"doc_type": doc.doc_type, "filename": doc.filename}
        for f in fields:
            record[f.field_name] = f.corrected_value or f.extracted_value
        exported.append(record)

        doc.status = DocStatus.EXPORTED
    db.commit()
    return exported

FAQ

How do I handle scanned documents and images?

Use OCR as a preprocessing step before classification. PyMuPDF handles PDFs with embedded text. For scanned PDFs and images, use Tesseract OCR or a cloud service like Google Cloud Vision. Store the OCR quality score and route low-quality scans to human review regardless of extraction confidence.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

How do I improve extraction accuracy over time?

Use the human corrections as training signal. Track which fields are most frequently corrected and for which document types. Periodically update extraction prompts to include examples of common corrections. Consider fine-tuning an extraction model on your corrected dataset once you have several thousand reviewed documents.

How do I handle multi-page documents where relevant data spans pages?

Concatenate all pages into a single text block before extraction. For very long documents, use a two-pass approach: first identify which pages contain relevant fields, then extract from only those pages. Store page numbers in the extraction metadata so reviewers can quickly navigate to the source.


#CapstoneProject #DocumentProcessing #HumanintheLoop #DataExtraction #Classification #FullStackAI #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Human-in-the-Loop Hybrid Agents: 73% Fewer Errors in 2026

Fully autonomous agents are still a fantasy in production. LangGraph's interrupt() lets you pause for human approval mid-graph without losing state. We cover approve/edit/reject/respond actions and CallSphere's escalation ladder.

Technology

Chunking Strategies Compared: Recursive, Semantic, Late, and Contextual Chunking

How you chunk decides what your RAG retrieves. The 2026 chunking strategies — recursive, semantic, late, contextual — benchmarked side-by-side.

AI Engineering

Human-in-the-Loop with LangGraph Interrupts: Approve, Edit, Resume

LangGraph 1.0's interrupt() primitive makes human-in-the-loop a one-liner. Approval queues, edit-and-resume, and dynamic break points in production.

Learn Agentic AI

Building Document Processing Agents: PDF, Email, and Spreadsheet Automation

Technical guide to building AI agents that automate document processing — PDF parsing and extraction, email classification and routing, and spreadsheet analysis with reporting.

Learn Agentic AI

Receipt and Invoice Processing with Vision AI: End-to-End Expense Automation

Build a vision AI pipeline that scans receipts and invoices, extracts vendor names, dates, line items, and totals, categorizes expenses, and integrates with accounting systems for fully automated expense processing.

Learn Agentic AI

Handwriting Recognition with AI Agents: Processing Handwritten Forms and Notes

Build an AI agent pipeline for handwriting recognition that processes handwritten forms and notes, extracts field values with confidence scoring, and routes low-confidence results to human reviewers for correction.