Skip to content
Learn Agentic AI
Learn Agentic AI11 min read8 views

Migrating Agent Data: Moving Conversations, Sessions, and Memory Between Systems

Learn how to migrate conversations, sessions, and agent memory between AI systems with zero downtime. Covers data export, transformation, import validation, and cutover strategies.

Why Agent Data Migration Is Harder Than Regular Data Migration

Agent data has unique characteristics that make migration challenging. Conversations have temporal ordering that must be preserved. Session state references tool call IDs and function outputs that are framework-specific. Memory stores may contain embeddings tied to a particular model version. And users expect continuity — they do not want to re-explain context after a system change.

A well-planned migration preserves all of this while the system stays online.

Step 1: Define a Canonical Data Format

Before exporting anything, define a framework-agnostic format that captures all the information you need.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart TD
    MSG(["New message"])
    WORKING["Working memory<br/>rolling window"]
    EPISODIC[("Episodic memory<br/>past sessions")]
    SEMANTIC[("Semantic memory<br/>facts and preferences")]
    SUM["Summarizer<br/>compresses old turns"]
    ROUTER{"Retrieve<br/>needed memories"}
    PROMPT["Assembled context"]
    LLM["LLM"]
    UPD["Memory updater<br/>writes new facts"]
    MSG --> WORKING --> ROUTER
    ROUTER -->|Past sessions| EPISODIC
    ROUTER -->|User facts| SEMANTIC
    EPISODIC --> SUM --> PROMPT
    SEMANTIC --> PROMPT
    WORKING --> PROMPT --> LLM --> UPD
    UPD --> EPISODIC
    UPD --> SEMANTIC
    style ROUTER fill:#4f46e5,stroke:#4338ca,color:#fff
    style LLM fill:#f59e0b,stroke:#d97706,color:#1f2937
    style EPISODIC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style SEMANTIC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json

@dataclass
class CanonicalMessage:
    role: str  # "user", "assistant", "system", "tool"
    content: str
    timestamp: datetime
    tool_call_id: Optional[str] = None
    tool_name: Optional[str] = None
    metadata: dict = field(default_factory=dict)

@dataclass
class CanonicalSession:
    session_id: str
    user_id: str
    messages: list[CanonicalMessage]
    created_at: datetime
    updated_at: datetime
    agent_name: str
    metadata: dict = field(default_factory=dict)

def serialize_session(session: CanonicalSession) -> str:
    """Serialize to JSON for transport."""
    return json.dumps({
        "session_id": session.session_id,
        "user_id": session.user_id,
        "messages": [
            {
                "role": m.role,
                "content": m.content,
                "timestamp": m.timestamp.isoformat(),
                "tool_call_id": m.tool_call_id,
                "tool_name": m.tool_name,
                "metadata": m.metadata,
            }
            for m in session.messages
        ],
        "created_at": session.created_at.isoformat(),
        "updated_at": session.updated_at.isoformat(),
        "agent_name": session.agent_name,
        "metadata": session.metadata,
    }, indent=2)

Step 2: Export from the Source System

Write an exporter that reads from your current storage and transforms to the canonical format.

import asyncpg

async def export_sessions(
    db_url: str,
    batch_size: int = 500,
) -> list[CanonicalSession]:
    """Export sessions from PostgreSQL in batches."""
    conn = await asyncpg.connect(db_url)
    sessions = []
    offset = 0

    while True:
        rows = await conn.fetch(
            """
            SELECT s.id, s.user_id, s.created_at, s.updated_at,
                   s.agent_name, s.metadata
            FROM sessions s
            ORDER BY s.created_at
            LIMIT $1 OFFSET $2
            """,
            batch_size, offset,
        )
        if not rows:
            break

        for row in rows:
            messages = await conn.fetch(
                """
                SELECT role, content, created_at, tool_call_id,
                       tool_name, metadata
                FROM messages
                WHERE session_id = $1
                ORDER BY created_at
                """,
                row["id"],
            )
            sessions.append(CanonicalSession(
                session_id=str(row["id"]),
                user_id=str(row["user_id"]),
                messages=[
                    CanonicalMessage(
                        role=m["role"],
                        content=m["content"],
                        timestamp=m["created_at"],
                        tool_call_id=m.get("tool_call_id"),
                        tool_name=m.get("tool_name"),
                        metadata=m.get("metadata") or {},
                    )
                    for m in messages
                ],
                created_at=row["created_at"],
                updated_at=row["updated_at"],
                agent_name=row["agent_name"],
                metadata=row.get("metadata") or {},
            ))
        offset += batch_size

    await conn.close()
    return sessions

Step 3: Import and Validate

Import into the target system with validation checks at every step.

async def import_sessions(
    sessions: list[CanonicalSession],
    target_db_url: str,
) -> dict:
    """Import sessions with validation."""
    conn = await asyncpg.connect(target_db_url)
    stats = {"imported": 0, "skipped": 0, "errors": 0}

    for session in sessions:
        try:
            # Check for duplicates
            existing = await conn.fetchval(
                "SELECT 1 FROM sessions WHERE id = $1",
                session.session_id,
            )
            if existing:
                stats["skipped"] += 1
                continue

            async with conn.transaction():
                await conn.execute(
                    """INSERT INTO sessions
                       (id, user_id, agent_name, created_at, updated_at)
                       VALUES ($1, $2, $3, $4, $5)""",
                    session.session_id, session.user_id,
                    session.agent_name, session.created_at,
                    session.updated_at,
                )
                for msg in session.messages:
                    await conn.execute(
                        """INSERT INTO messages
                           (session_id, role, content, created_at)
                           VALUES ($1, $2, $3, $4)""",
                        session.session_id, msg.role,
                        msg.content, msg.timestamp,
                    )
            stats["imported"] += 1
        except Exception as e:
            stats["errors"] += 1
            print(f"Error importing {session.session_id}: {e}")

    await conn.close()
    return stats

Step 4: Validate Counts and Integrity

After import, run integrity checks to make sure nothing was lost.

async def validate_migration(source_url: str, target_url: str):
    src = await asyncpg.connect(source_url)
    tgt = await asyncpg.connect(target_url)

    src_sessions = await src.fetchval("SELECT count(*) FROM sessions")
    tgt_sessions = await tgt.fetchval("SELECT count(*) FROM sessions")
    src_messages = await src.fetchval("SELECT count(*) FROM messages")
    tgt_messages = await tgt.fetchval("SELECT count(*) FROM messages")

    print(f"Sessions: source={src_sessions}, target={tgt_sessions}")
    print(f"Messages: source={src_messages}, target={tgt_messages}")
    assert src_sessions == tgt_sessions, "Session count mismatch"
    assert src_messages == tgt_messages, "Message count mismatch"

FAQ

How do I handle active sessions during migration?

Use a write-ahead approach. Set a cutoff timestamp, export all sessions up to that point, then replay any new writes that occurred during the export. A CDC (Change Data Capture) stream from tools like Debezium can capture these delta writes automatically.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Should I migrate tool call results or just the conversation text?

Migrate tool call results. They provide context that the agent used to formulate responses. Without them, resuming a conversation in the new system may produce inconsistent follow-ups because the agent loses the factual grounding from previous tool calls.

What about memory stores like vector databases?

Vector memory requires special handling because embeddings are model-specific. If you are changing embedding models, you must re-embed the source documents rather than copying vectors directly. Plan for the re-embedding compute cost.


#DataMigration #AgentMemory #Conversations #ZeroDowntime #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Evaluating Agent Memory: Recall, Precision, and the Eval Pipeline Most Teams Don't Build

Memory is supposed to make agents better — but does it? Build a memory eval pipeline that measures recall, precision, contradiction rate, and the freshness/staleness tradeoff.

Agentic AI

Agent Memory in LangGraph 2026: Short-Term, Long-Term, and the Patterns That Survive Production

How short-term (thread-scoped) and long-term (cross-thread) memory actually work in LangGraph, with code, schemas, and the eviction policies that keep cost predictable.

AI Strategy

Enterprise CIO Guide: Zep 2.0 — Temporal Knowledge Graphs for Agent Memory

Enterprise CIO Guide perspective on Zep 2.0's Graphiti engine adds temporal knowledge graphs to agent memory — the right data structure for fact updates over time.

Agentic AI

Smolagents: Hugging Face's Code-First Agent Framework Reviewed

Smolagents lets agents write Python instead of JSON. Why code-as-action reduces tool errors and where the security trade-offs are for production deployments.

AI Strategy

Enterprise CIO Guide: Letta 1.0 — The Agent OS for Stateful Agents

Enterprise CIO Guide perspective on Letta (formerly MemGPT) hit 1.0 as a full agent OS — memory, tools, runtime, and dashboard in one platform.