Skip to content
Learn Agentic AI
Learn Agentic AI11 min read4 views

Building a Conversations API: CRUD Operations for Agent Chat Sessions

Design and implement a full Conversations API for AI agent chat sessions. Covers resource modeling, conversation lifecycle, message threading, metadata management, and FastAPI implementation patterns.

Designing the Conversation Resource Model

A Conversations API is the backbone of any AI agent platform. It manages the lifecycle of chat sessions, organizes messages into threads, tracks metadata like token usage and model configuration, and provides the history that agents use for context.

The resource hierarchy follows a natural pattern: an agent has many conversations, and each conversation has many messages. Messages can have different roles (user, assistant, system, tool) and may include structured metadata like tool call results.

Database Schema

Start with the data model. Two core tables handle the conversation and message resources.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway<br/>auth plus rate limit"]
    APP["FastAPI app<br/>handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer<br/>business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
from sqlalchemy import (
    Column, String, Text, JSON, DateTime, Integer,
    ForeignKey, Enum as SAEnum, func,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
import uuid
import enum

class ConversationStatus(str, enum.Enum):
    active = "active"
    archived = "archived"
    deleted = "deleted"

class Conversation(Base):
    __tablename__ = "conversations"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    agent_id = Column(String(100), nullable=False, index=True)
    title = Column(String(500), nullable=True)
    status = Column(
        SAEnum(ConversationStatus),
        default=ConversationStatus.active,
        nullable=False,
    )
    metadata_ = Column("metadata", JSON, default=dict)
    model = Column(String(100), nullable=True)
    total_tokens = Column(Integer, default=0)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(
        DateTime, server_default=func.now(), onupdate=func.now()
    )

    messages = relationship("Message", back_populates="conversation")

class Message(Base):
    __tablename__ = "messages"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id = Column(
        UUID(as_uuid=True),
        ForeignKey("conversations.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    role = Column(String(20), nullable=False)  # user, assistant, system, tool
    content = Column(Text, nullable=True)
    tool_calls = Column(JSON, nullable=True)
    tool_call_id = Column(String(100), nullable=True)
    tokens = Column(Integer, default=0)
    created_at = Column(DateTime, server_default=func.now())

    conversation = relationship("Conversation", back_populates="messages")

CRUD Endpoints

The API follows RESTful conventions with conversations as the top-level resource and messages nested beneath them.

from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field

app = FastAPI()

class CreateConversation(BaseModel):
    agent_id: str
    title: str | None = None
    model: str = "gpt-4o"
    metadata: dict = Field(default_factory=dict)

class CreateMessage(BaseModel):
    role: str
    content: str | None = None
    tool_calls: list[dict] | None = None
    tool_call_id: str | None = None

@app.post("/v1/conversations", status_code=201)
async def create_conversation(
    body: CreateConversation,
    db: AsyncSession = Depends(get_db),
):
    conv = Conversation(
        agent_id=body.agent_id,
        title=body.title,
        model=body.model,
        metadata_=body.metadata,
    )
    db.add(conv)
    await db.commit()
    await db.refresh(conv)
    return conv.to_dict()

@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(
    conversation_id: str,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv or conv.status == ConversationStatus.deleted:
        raise HTTPException(status_code=404, detail="Conversation not found")
    return conv.to_dict()

@app.patch("/v1/conversations/{conversation_id}")
async def update_conversation(
    conversation_id: str,
    body: dict,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")

    allowed_fields = {"title", "metadata", "status"}
    for key, value in body.items():
        if key in allowed_fields:
            setattr(conv, key if key != "metadata" else "metadata_", value)

    await db.commit()
    await db.refresh(conv)
    return conv.to_dict()

Adding Messages to a Conversation

Messages are appended to a conversation and ordered by creation time. The endpoint also updates the conversation's token count and timestamp.

@app.post(
    "/v1/conversations/{conversation_id}/messages",
    status_code=201,
)
async def add_message(
    conversation_id: str,
    body: CreateMessage,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv or conv.status != ConversationStatus.active:
        raise HTTPException(
            status_code=404,
            detail="Active conversation not found",
        )

    msg = Message(
        conversation_id=conv.id,
        role=body.role,
        content=body.content,
        tool_calls=body.tool_calls,
        tool_call_id=body.tool_call_id,
    )
    db.add(msg)

    conv.updated_at = func.now()
    await db.commit()
    await db.refresh(msg)

    return msg.to_dict()

@app.get("/v1/conversations/{conversation_id}/messages")
async def list_messages(
    conversation_id: str,
    cursor: str | None = Query(None),
    limit: int = Query(50, ge=1, le=200),
    db: AsyncSession = Depends(get_db),
):
    query = (
        select(Message)
        .where(Message.conversation_id == conversation_id)
        .order_by(Message.created_at.asc())
    )

    if cursor:
        decoded = decode_cursor(cursor)
        query = query.where(Message.created_at > decoded["created_at"])

    rows = await db.execute(query.limit(limit + 1))
    messages = rows.scalars().all()

    has_more = len(messages) > limit
    messages = messages[:limit]

    return {
        "data": [m.to_dict() for m in messages],
        "has_more": has_more,
        "next_cursor": encode_cursor(
            messages[-1].created_at.isoformat(),
            str(messages[-1].id),
        ) if has_more else None,
    }

Conversation Lifecycle: Archive and Soft Delete

Rather than hard-deleting conversations, use status transitions. Active conversations can be archived (hidden from default listings but still accessible) or soft-deleted (excluded from all queries, eligible for permanent deletion after a retention period).

@app.post("/v1/conversations/{conversation_id}/archive")
async def archive_conversation(
    conversation_id: str,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv:
        raise HTTPException(status_code=404)
    conv.status = ConversationStatus.archived
    await db.commit()
    return {"status": "archived"}

@app.delete("/v1/conversations/{conversation_id}", status_code=204)
async def delete_conversation(
    conversation_id: str,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv:
        raise HTTPException(status_code=404)
    conv.status = ConversationStatus.deleted
    await db.commit()

FAQ

How should I handle conversation context windows for LLM calls?

Store all messages in the database for audit and history, but only send the most recent messages to the LLM, respecting the model's context window. Implement a context builder that trims from the oldest messages first while always preserving the system prompt. Track token counts per message so you can calculate the window without re-tokenizing.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Should I use UUIDs or auto-increment integers for conversation IDs?

Use UUIDs for external-facing IDs. They are non-guessable (important for security), globally unique (simplifies distributed systems), and do not leak information about the total number of conversations. Use auto-increment integers internally if you need efficient keyset pagination. You can expose the UUID and use the integer for internal ordering.

How do I handle concurrent writes to the same conversation?

Use database-level ordering by relying on created_at timestamps with sufficient precision (microseconds) combined with the message UUID as a tiebreaker. For the conversation's updated_at field, use the database's NOW() function rather than application time to avoid clock skew. If multiple agents write to the same conversation, consider optimistic locking with a version column to detect conflicts.


#ConversationsAPI #CRUD #ChatSessions #FastAPI #APIDesign #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.