Skip to content
Learn Agentic AI
Learn Agentic AI11 min read6 views

Building a Document Ingestion Pipeline for RAG: PDF, DOCX, HTML, and CSV Processing

Learn how to build a production document ingestion pipeline that detects file formats, extracts text, chunks content intelligently, generates embeddings, and stores everything for retrieval-augmented generation.

Why Document Ingestion Is the Foundation of RAG

Retrieval-augmented generation only works if the retrieval layer has clean, well-structured data to search. Most RAG failures are not prompt engineering problems — they are data ingestion problems. If your pipeline silently drops tables from PDFs, strips formatting from DOCX headers, or produces overlapping chunks with no context, your agent will hallucinate confidently from incomplete information.

A production ingestion pipeline must handle four concerns: format detection and extraction, intelligent chunking, embedding generation, and indexed storage. Each stage has pitfalls that compound downstream.

Format Detection and Text Extraction

The first challenge is reliably extracting text from heterogeneous file types. Never rely on file extensions alone — a renamed .txt file might contain HTML.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    Q(["User query"])
    EMB["Embed query<br/>text-embedding-3"]
    VEC[("Vector DB<br/>pgvector or Pinecone")]
    RET["Top-k retrieval<br/>k = 8"]
    PROMPT["Augmented prompt<br/>system plus context"]
    LLM["LLM generation<br/>Claude or GPT"]
    CITE["Inline citations<br/>and page anchors"]
    OUT(["Grounded answer"])
    Q --> EMB --> VEC --> RET --> PROMPT --> LLM --> CITE --> OUT
    style EMB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style VEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
    style OUT fill:#059669,stroke:#047857,color:#fff
import magic
from pathlib import Path
from dataclasses import dataclass
from typing import List

@dataclass
class ExtractedDocument:
    source: str
    content: str
    metadata: dict
    pages: List[str]

class FormatDetector:
    MIME_MAP = {
        "application/pdf": "pdf",
        "application/vnd.openxmlformats-officedocument"
        ".wordprocessingml.document": "docx",
        "text/html": "html",
        "text/csv": "csv",
        "text/plain": "text",
    }

    def detect(self, file_path: str) -> str:
        mime = magic.from_file(file_path, mime=True)
        fmt = self.MIME_MAP.get(mime)
        if not fmt:
            raise ValueError(
                f"Unsupported format: {mime} for {file_path}"
            )
        return fmt

class DocumentExtractor:
    def __init__(self):
        self.detector = FormatDetector()

    def extract(self, file_path: str) -> ExtractedDocument:
        fmt = self.detector.detect(file_path)
        extractor = getattr(self, f"_extract_{fmt}")
        return extractor(file_path)

    def _extract_pdf(self, path: str) -> ExtractedDocument:
        import pdfplumber
        pages = []
        with pdfplumber.open(path) as pdf:
            for page in pdf.pages:
                text = page.extract_text() or ""
                tables = page.extract_tables()
                for table in tables:
                    rows = [
                        " | ".join(str(c or "") for c in row)
                        for row in table
                    ]
                    text += "\n" + "\n".join(rows)
                pages.append(text)
        return ExtractedDocument(
            source=path,
            content="\n\n".join(pages),
            metadata={"format": "pdf", "page_count": len(pages)},
            pages=pages,
        )

    def _extract_docx(self, path: str) -> ExtractedDocument:
        from docx import Document
        doc = Document(path)
        paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
        return ExtractedDocument(
            source=path,
            content="\n\n".join(paragraphs),
            metadata={"format": "docx", "paragraph_count": len(paragraphs)},
            pages=paragraphs,
        )

    def _extract_html(self, path: str) -> ExtractedDocument:
        from bs4 import BeautifulSoup
        with open(path, "r", encoding="utf-8") as f:
            soup = BeautifulSoup(f.read(), "html.parser")
        for tag in soup(["script", "style", "nav", "footer"]):
            tag.decompose()
        text = soup.get_text(separator="\n", strip=True)
        return ExtractedDocument(
            source=path,
            content=text,
            metadata={"format": "html", "title": soup.title.string if soup.title else ""},
            pages=[text],
        )

    def _extract_csv(self, path: str) -> ExtractedDocument:
        import csv
        rows = []
        with open(path, "r", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                line = " | ".join(
                    f"{k}: {v}" for k, v in row.items()
                )
                rows.append(line)
        return ExtractedDocument(
            source=path,
            content="\n".join(rows),
            metadata={"format": "csv", "row_count": len(rows)},
            pages=rows,
        )

The key design decision here is using pdfplumber over PyPDF2 because it handles table extraction natively. Tables are a major source of lost information in PDF pipelines.

Intelligent Chunking

Naive fixed-size chunking breaks sentences mid-thought and loses section context. A better approach uses recursive splitting with overlap and respects document structure.

from typing import List
from dataclasses import dataclass

@dataclass
class Chunk:
    text: str
    metadata: dict
    index: int

class RecursiveChunker:
    def __init__(
        self,
        max_tokens: int = 512,
        overlap_tokens: int = 64,
        separators: list = None,
    ):
        self.max_tokens = max_tokens
        self.overlap_tokens = overlap_tokens
        self.separators = separators or [
            "\n\n", "\n", ". ", " "
        ]

    def chunk(
        self, doc: ExtractedDocument
    ) -> List[Chunk]:
        raw_chunks = self._split(
            doc.content, self.separators
        )
        chunks = []
        for i, text in enumerate(raw_chunks):
            chunks.append(Chunk(
                text=text.strip(),
                metadata={
                    **doc.metadata,
                    "source": doc.source,
                    "chunk_index": i,
                    "total_chunks": len(raw_chunks),
                },
                index=i,
            ))
        return chunks

    def _split(self, text: str, seps: list) -> List[str]:
        if not seps:
            return self._fixed_split(text)
        sep = seps[0]
        parts = text.split(sep)
        merged = []
        current = ""
        for part in parts:
            candidate = current + sep + part if current else part
            if self._token_count(candidate) <= self.max_tokens:
                current = candidate
            else:
                if current:
                    merged.append(current)
                if self._token_count(part) > self.max_tokens:
                    merged.extend(self._split(part, seps[1:]))
                else:
                    current = part
                    continue
                current = ""
        if current:
            merged.append(current)
        return self._add_overlap(merged)

    def _add_overlap(self, chunks: List[str]) -> List[str]:
        if len(chunks) <= 1:
            return chunks
        result = [chunks[0]]
        for i in range(1, len(chunks)):
            prev_words = chunks[i - 1].split()
            overlap = " ".join(prev_words[-self.overlap_tokens:])
            result.append(overlap + " " + chunks[i])
        return result

    def _fixed_split(self, text: str) -> List[str]:
        words = text.split()
        return [
            " ".join(words[i:i + self.max_tokens])
            for i in range(0, len(words), self.max_tokens)
        ]

    def _token_count(self, text: str) -> int:
        return len(text.split())

Embedding and Storage

Once chunks are ready, generate embeddings and store them in a vector database. Batch processing with rate limiting prevents API throttling.

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def embed_and_store(chunks: List[Chunk], collection):
    batch_size = 100
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        response = await client.embeddings.create(
            model="text-embedding-3-small",
            input=[c.text for c in batch],
        )
        ids = [f"{batch[j].metadata['source']}_{batch[j].index}" for j in range(len(batch))]
        embeddings = [e.embedding for e in response.data]
        metadatas = [c.metadata for c in batch]
        documents = [c.text for c in batch]
        collection.upsert(
            ids=ids,
            embeddings=embeddings,
            metadatas=metadatas,
            documents=documents,
        )
        await asyncio.sleep(0.5)  # rate limiting

FAQ

How should I handle scanned PDFs with no extractable text?

Use OCR as a fallback. Check if pdfplumber returns empty text for a page, then run that page through pytesseract or a cloud OCR service like AWS Textract. Add an ocr_applied: true flag to chunk metadata so downstream consumers know the text quality may be lower.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

What chunk size works best for RAG?

Start with 512 tokens with 64-token overlap. Smaller chunks (256 tokens) improve precision for factual Q&A but lose context for summarization tasks. Larger chunks (1024 tokens) work better for complex reasoning. Test with your actual queries and measure retrieval recall to find the right size for your use case.

Should I re-embed everything when the embedding model changes?

Yes. Embedding spaces are model-specific and not interchangeable. When you upgrade models, re-process all documents and rebuild your vector index. Use a versioned collection naming scheme like docs_v2_embedding3small so you can run both indexes in parallel during migration.


#RAG #DocumentProcessing #DataPipelines #Embeddings #VectorSearch #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like