Skip to content
Learn Agentic AI
Learn Agentic AI10 min read8 views

ETL for AI Agent Training Data: Extracting and Transforming Conversation Logs

Build an ETL pipeline that extracts conversation logs from AI agent systems, anonymizes PII, transforms them into training-ready formats, and filters for quality to improve agent performance.

Why Conversation Logs Are Your Most Valuable Data

Every conversation your AI agent handles is a data point about what users actually ask, how the agent responds, and where it fails. This data is far more valuable than synthetic training sets because it reflects real user language, real edge cases, and real failure modes specific to your domain.

But raw conversation logs are messy. They contain PII that cannot be stored in training sets, they include failed conversations that would teach the model bad habits, and they are in whatever format your logging system uses rather than the format your training pipeline needs. An ETL pipeline transforms raw logs into clean, anonymized, quality-filtered training data.

Extracting Logs from Multiple Sources

Agent conversation logs typically live in multiple places: database tables, JSON log files, and third-party platforms. The extraction layer normalizes all sources into a common format.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    LOG[("Conversation logs")]
    PII["PII redaction<br/>regex plus ML"]
    LABEL["Labeling pipeline<br/>rubric plus reviewers"]
    DEDUP["Dedup near<br/>duplicates"]
    SPLIT{"Train, dev,<br/>test split"}
    TRAIN[("Train set")]
    DEV[("Dev set")]
    TEST[("Held out test")]
    EVAL["Eval harness"]
    LOG --> PII --> LABEL --> DEDUP --> SPLIT
    SPLIT --> TRAIN
    SPLIT --> DEV
    SPLIT --> TEST --> EVAL
    style LABEL fill:#4f46e5,stroke:#4338ca,color:#fff
    style EVAL fill:#f59e0b,stroke:#d97706,color:#1f2937
    style TEST fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum
import json

class MessageRole(str, Enum):
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"
    TOOL = "tool"

@dataclass
class Message:
    role: MessageRole
    content: str
    timestamp: Optional[datetime] = None
    tool_name: Optional[str] = None
    tool_args: Optional[dict] = None

@dataclass
class Conversation:
    id: str
    messages: List[Message]
    metadata: dict
    source: str

class LogExtractor:
    async def extract_from_db(self, db_pool) -> List[Conversation]:
        async with db_pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT
                    c.id,
                    c.created_at,
                    c.metadata,
                    json_agg(
                        json_build_object(
                            'role', m.role,
                            'content', m.content,
                            'timestamp', m.created_at,
                            'tool_name', m.tool_name,
                            'tool_args', m.tool_args
                        ) ORDER BY m.created_at
                    ) AS messages
                FROM conversations c
                JOIN messages m ON m.conversation_id = c.id
                WHERE c.created_at >= NOW() - INTERVAL '7 days'
                GROUP BY c.id, c.created_at, c.metadata
            """)

        conversations = []
        for row in rows:
            messages = [
                Message(
                    role=MessageRole(m["role"]),
                    content=m["content"],
                    timestamp=m.get("timestamp"),
                    tool_name=m.get("tool_name"),
                    tool_args=m.get("tool_args"),
                )
                for m in row["messages"]
            ]
            conversations.append(Conversation(
                id=str(row["id"]),
                messages=messages,
                metadata=dict(row["metadata"]) if row["metadata"] else {},
                source="database",
            ))
        return conversations

PII Anonymization

Training data must never contain personally identifiable information. Build a pipeline that detects and replaces PII before any data leaves the extraction stage.

import re
from typing import Dict, List

class PIIAnonymizer:
    PATTERNS = {
        "email": (
            r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            "[EMAIL_REDACTED]"
        ),
        "phone": (
            r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "[PHONE_REDACTED]"
        ),
        "ssn": (
            r"\b\d{3}-\d{2}-\d{4}\b",
            "[SSN_REDACTED]"
        ),
        "credit_card": (
            r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
            "[CC_REDACTED]"
        ),
        "ip_address": (
            r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
            "[IP_REDACTED]"
        ),
    }

    def __init__(self, custom_patterns: Dict[str, tuple] = None):
        self.patterns = {**self.PATTERNS}
        if custom_patterns:
            self.patterns.update(custom_patterns)
        self.stats = {key: 0 for key in self.patterns}

    def anonymize_text(self, text: str) -> str:
        for name, (pattern, replacement) in self.patterns.items():
            matches = re.findall(pattern, text)
            self.stats[name] += len(matches)
            text = re.sub(pattern, replacement, text)
        return text

    def anonymize_conversation(
        self, conv: Conversation
    ) -> Conversation:
        clean_messages = []
        for msg in conv.messages:
            clean_messages.append(Message(
                role=msg.role,
                content=self.anonymize_text(msg.content),
                timestamp=msg.timestamp,
                tool_name=msg.tool_name,
                tool_args=(
                    self._anonymize_dict(msg.tool_args)
                    if msg.tool_args else None
                ),
            ))
        return Conversation(
            id=conv.id,
            messages=clean_messages,
            metadata={},  # strip metadata entirely
            source=conv.source,
        )

    def _anonymize_dict(self, d: dict) -> dict:
        result = {}
        for k, v in d.items():
            if isinstance(v, str):
                result[k] = self.anonymize_text(v)
            elif isinstance(v, dict):
                result[k] = self._anonymize_dict(v)
            else:
                result[k] = v
        return result

Quality Filtering

Not every conversation should become training data. Filter out conversations that are too short, contain errors, or represent edge cases that would confuse the model.

@dataclass
class QualityScore:
    conversation_id: str
    turn_count: int
    avg_response_length: int
    has_tool_use: bool
    has_error: bool
    user_satisfaction: Optional[float]
    passes: bool
    rejection_reason: Optional[str] = None

class QualityFilter:
    def __init__(
        self,
        min_turns: int = 3,
        min_avg_response_length: int = 50,
        max_turns: int = 50,
    ):
        self.min_turns = min_turns
        self.min_avg_response_length = min_avg_response_length
        self.max_turns = max_turns

    def evaluate(self, conv: Conversation) -> QualityScore:
        user_msgs = [m for m in conv.messages if m.role == MessageRole.USER]
        asst_msgs = [m for m in conv.messages if m.role == MessageRole.ASSISTANT]
        turn_count = len(user_msgs)

        avg_length = 0
        if asst_msgs:
            avg_length = sum(len(m.content) for m in asst_msgs) // len(asst_msgs)

        has_tool = any(m.role == MessageRole.TOOL for m in conv.messages)

        error_indicators = [
            "error", "sorry, i cannot", "i don't have access",
            "something went wrong",
        ]
        has_error = any(
            any(ind in m.content.lower() for ind in error_indicators)
            for m in asst_msgs
        )

        passes = True
        reason = None
        if turn_count < self.min_turns:
            passes, reason = False, f"Too few turns: {turn_count}"
        elif turn_count > self.max_turns:
            passes, reason = False, f"Too many turns: {turn_count}"
        elif avg_length < self.min_avg_response_length:
            passes, reason = False, f"Responses too short: {avg_length}"
        elif has_error:
            passes, reason = False, "Contains error responses"

        return QualityScore(
            conversation_id=conv.id,
            turn_count=turn_count,
            avg_response_length=avg_length,
            has_tool_use=has_tool,
            has_error=has_error,
            user_satisfaction=None,
            passes=passes,
            rejection_reason=reason,
        )

Format Conversion for Fine-Tuning

Convert filtered conversations to the JSONL format expected by training APIs.

def to_openai_format(conv: Conversation) -> dict:
    messages = []
    for msg in conv.messages:
        if msg.role == MessageRole.TOOL:
            messages.append({
                "role": "tool",
                "content": msg.content,
                "tool_call_id": msg.tool_name,
            })
        else:
            messages.append({
                "role": msg.role.value,
                "content": msg.content,
            })
    return {"messages": messages}

def export_training_data(
    conversations: List[Conversation],
    output_path: str,
):
    with open(output_path, "w") as f:
        for conv in conversations:
            line = json.dumps(to_openai_format(conv))
            f.write(line + "\n")

FAQ

How do I handle PII that regex patterns miss, like names and addresses?

Regex catches structured PII like emails and phone numbers. For unstructured PII like names and addresses, use a named entity recognition model such as spaCy's en_core_web_lg or a dedicated PII detection service. Run NER as a second pass after regex replacement, and replace detected PERSON, GPE, and ADDRESS entities with placeholders.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

How many conversations do I need for effective fine-tuning?

OpenAI recommends a minimum of 50 examples for fine-tuning, but meaningful improvement typically requires 500 to 1,000 high-quality conversations. Quality matters more than quantity — 200 well-filtered conversations outperform 2,000 noisy ones. Start with a small dataset, evaluate the fine-tuned model, and add more data where you see gaps.

Should I include conversations where the agent used tools?

Yes, including tool-use conversations is especially valuable because tool calling is one of the hardest skills for agents to learn. Keep the tool call messages and tool response messages in the training data. This teaches the model when to invoke tools, how to format arguments, and how to synthesize tool outputs into natural responses.


#ETL #TrainingData #ConversationLogs #DataPipelines #PIIAnonymization #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Infrastructure

Postgres Logical Replication for AI ETL: Stream OLTP to Your Feature Store (2026)

Stop running brittle nightly batches. Postgres logical replication gives you sub-second CDC into a warehouse, feature store, or vector index. A working publication, slot management, and a Supabase ETL Rust example.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Table Extraction from Images and PDFs with AI: Building Reliable Data Pipelines

Build an AI-powered table extraction pipeline that detects tables in images and PDFs, recognizes cell boundaries, infers structure, and outputs clean CSV data for downstream consumption.

Learn Agentic AI

Data Quality Pipelines for AI Agents: Validation, Deduplication, and Normalization

Build a data quality pipeline that validates incoming data, deduplicates records with fuzzy matching, normalizes schemas, and ensures your AI agent's knowledge base stays clean and accurate.

Learn Agentic AI

Building an Agent Analytics Pipeline: Collecting, Storing, and Analyzing Conversation Data

Learn how to build an end-to-end analytics pipeline for AI agents, from event collection and schema design to data warehousing, ETL processing, and query patterns that surface actionable insights.

Learn Agentic AI

Building AI Data Import Agents: Mapping, Cleaning, and Validating Uploaded Data

Create an AI-powered data import pipeline that detects file formats, maps columns to your schema automatically, cleans messy data, and validates records before insertion.