Skip to content
Learn Agentic AI
Learn Agentic AI14 min read3 views

Capstone: Building a RAG-Powered Knowledge Base with Admin Dashboard

Build a complete retrieval-augmented generation knowledge base with document ingestion, semantic search, a chat interface for users, and an admin panel with analytics for managing content.

Architecture Overview

A RAG-powered knowledge base lets users ask questions in natural language and receive accurate answers grounded in your organization's documents. This capstone builds four components: a document ingestion pipeline that chunks and embeds uploaded files, a vector search layer using pgvector, a chat interface that retrieves relevant chunks and generates answers, and an admin dashboard for managing documents and viewing analytics.

The tech stack is FastAPI for the backend, PostgreSQL with pgvector for storage and search, OpenAI for embeddings and generation, and Next.js for both the user chat interface and admin dashboard.

Database Schema with pgvector

# models.py
from sqlalchemy import Column, String, Text, Integer, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from pgvector.sqlalchemy import Vector
import uuid

class Document(Base):
    __tablename__ = "documents"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    title = Column(String(500), nullable=False)
    source_type = Column(String(50))  # "pdf", "markdown", "url"
    source_url = Column(Text, nullable=True)
    total_chunks = Column(Integer, default=0)
    status = Column(String(20), default="processing")  # processing, ready, error
    uploaded_by = Column(String(255))
    created_at = Column(DateTime, server_default="now()")

class Chunk(Base):
    __tablename__ = "chunks"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"))
    content = Column(Text, nullable=False)
    chunk_index = Column(Integer)
    embedding = Column(Vector(1536))  # OpenAI text-embedding-3-small
    metadata_ = Column(Text)  # JSON: page number, section heading

class ChatSession(Base):
    __tablename__ = "chat_sessions"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    query_count = Column(Integer, default=0)
    created_at = Column(DateTime, server_default="now()")

class ChatMessage(Base):
    __tablename__ = "chat_messages"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    session_id = Column(UUID(as_uuid=True), ForeignKey("chat_sessions.id"))
    role = Column(String(20))
    content = Column(Text)
    source_chunks = Column(ARRAY(String))  # chunk IDs used for this answer
    created_at = Column(DateTime, server_default="now()")

Document Ingestion Pipeline

The ingestion pipeline accepts a file upload, extracts text, splits it into chunks, generates embeddings, and stores everything in the database.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    Q(["User query"])
    EMB["Embed query<br/>text-embedding-3"]
    VEC[("Vector DB<br/>pgvector or Pinecone")]
    RET["Top-k retrieval<br/>k = 8"]
    PROMPT["Augmented prompt<br/>system plus context"]
    LLM["LLM generation<br/>Claude or GPT"]
    CITE["Inline citations<br/>and page anchors"]
    OUT(["Grounded answer"])
    Q --> EMB --> VEC --> RET --> PROMPT --> LLM --> CITE --> OUT
    style EMB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style VEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
    style OUT fill:#059669,stroke:#047857,color:#fff
# services/ingestion.py
from langchain.text_splitter import RecursiveCharacterTextSplitter
import openai, fitz  # PyMuPDF

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap=200,
    separators=["\n\n", "\n", ". ", " "],
)

async def ingest_document(file_path: str, doc_id: str, db):
    # Extract text based on file type
    if file_path.endswith(".pdf"):
        doc = fitz.open(file_path)
        text = "\n".join([page.get_text() for page in doc])
    else:
        with open(file_path) as f:
            text = f.read()

    # Split into chunks
    chunks = text_splitter.split_text(text)

    # Generate embeddings in batches
    batch_size = 100
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        response = openai.embeddings.create(
            model="text-embedding-3-small", input=batch
        )
        for j, embedding_data in enumerate(response.data):
            chunk = Chunk(
                document_id=doc_id,
                content=batch[j],
                chunk_index=i + j,
                embedding=embedding_data.embedding,
            )
            db.add(chunk)

    # Update document status
    document = db.query(Document).get(doc_id)
    document.total_chunks = len(chunks)
    document.status = "ready"
    db.commit()

Semantic Search and Answer Generation

The search endpoint embeds the user query, finds the most relevant chunks using cosine similarity, and passes them to the LLM as context.

# services/search.py
import openai

async def search_and_answer(query: str, session_id: str, db) -> dict:
    # Embed the query
    q_resp = openai.embeddings.create(
        model="text-embedding-3-small", input=[query]
    )
    query_embedding = q_resp.data[0].embedding

    # Vector similarity search with pgvector
    results = db.execute(
        text("""
            SELECT id, content, document_id,
                   1 - (embedding <=> :embedding) AS similarity
            FROM chunks
            WHERE 1 - (embedding <=> :embedding) > 0.7
            ORDER BY embedding <=> :embedding
            LIMIT 5
        """),
        {"embedding": str(query_embedding)},
    ).fetchall()

    if not results:
        return {"answer": "I could not find relevant information.", "sources": []}

    # Build context from retrieved chunks
    context = "\n\n---\n\n".join([r.content for r in results])

    # Generate answer with sources
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"""Answer based on this context:

{context}

If the context does not contain the answer, say so. Cite which sections you used."""},
            {"role": "user", "content": query},
        ],
    )

    answer = response.choices[0].message.content
    source_ids = [str(r.id) for r in results]

    # Save to chat history
    db.add(ChatMessage(session_id=session_id, role="user", content=query))
    db.add(ChatMessage(
        session_id=session_id, role="assistant",
        content=answer, source_chunks=source_ids
    ))
    db.commit()

    return {"answer": answer, "sources": source_ids}

Admin Dashboard API

The admin panel provides endpoints for managing documents, viewing search analytics, and monitoring system health.

# routes/admin.py
from fastapi import APIRouter

admin_router = APIRouter(prefix="/admin")

@admin_router.get("/documents")
async def list_documents(page: int = 1, per_page: int = 20, db=Depends(get_db)):
    offset = (page - 1) * per_page
    docs = db.query(Document).order_by(Document.created_at.desc()) \
        .offset(offset).limit(per_page).all()
    total = db.query(Document).count()
    return {"documents": docs, "total": total, "page": page}

@admin_router.get("/analytics")
async def get_analytics(db=Depends(get_db)):
    total_docs = db.query(Document).filter(Document.status == "ready").count()
    total_chunks = db.query(Chunk).count()
    total_queries = db.query(ChatMessage).filter(ChatMessage.role == "user").count()
    avg_sources = db.execute(text(
        "SELECT AVG(array_length(source_chunks, 1)) FROM chat_messages WHERE role='assistant'"
    )).scalar()
    return {
        "total_documents": total_docs,
        "total_chunks": total_chunks,
        "total_queries": total_queries,
        "avg_sources_per_answer": round(avg_sources or 0, 2),
    }

@admin_router.delete("/documents/{doc_id}")
async def delete_document(doc_id: str, db=Depends(get_db)):
    db.query(Chunk).filter(Chunk.document_id == doc_id).delete()
    db.query(Document).filter(Document.id == doc_id).delete()
    db.commit()
    return {"deleted": True}

FAQ

How do I handle documents that exceed the embedding model token limit?

The recursive text splitter already handles this by breaking text into chunks of 800 tokens. For documents with complex structure like tables, preprocess the document to extract tables separately and store them as dedicated chunks with metadata indicating they are tabular data.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

How do I improve answer quality when retrieved chunks are not relevant enough?

Implement hybrid search combining vector similarity with keyword search using PostgreSQL full-text search (tsvector). Re-rank results using a cross-encoder model before passing them to the LLM. Also consider adding a feedback mechanism where users rate answers, then use low-rated answers to identify gaps in your knowledge base.

How do I update a document without losing chat history references?

Use a versioning approach. When a document is re-uploaded, create new chunk records with the updated content and embeddings. Keep the old chunk records but mark them as archived. Chat history references remain valid because they point to the original chunk IDs.


#CapstoneProject #RAG #KnowledgeBase #VectorSearch #AdminDashboard #FullStackAI #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like