Skip to content
Learn Agentic AI
Learn Agentic AI10 min read14 views

AI Agent for Database Operations: Automated Query Optimization and Index Management

Build an AI agent that monitors PostgreSQL performance, detects slow queries, recommends optimal indexes, schedules vacuum operations, and plans for capacity growth.

Why Database Operations Need an Agent

Database performance degrades gradually. A query that ran in 5ms at launch takes 500ms with 10 million rows. An index that was perfect for v1 of your schema becomes a liability after v5. Dead tuples pile up. Connection pools saturate. By the time a human notices, users are already complaining. An AI database operations agent monitors these signals continuously and acts before problems become outages.

Slow Query Detection

The agent reads from PostgreSQL's pg_stat_statements extension, which tracks query execution statistics without modifying your application.

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
import asyncpg
from dataclasses import dataclass
from typing import Optional

@dataclass
class SlowQuery:
    query: str
    calls: int
    total_time_ms: float
    mean_time_ms: float
    rows_returned: int
    shared_blks_hit: int
    shared_blks_read: int
    cache_hit_ratio: float

class QueryMonitor:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=5)

    async def get_slow_queries(
        self, min_mean_ms: float = 100, min_calls: int = 10, limit: int = 20
    ) -> list[SlowQuery]:
        rows = await self.pool.fetch("""
            SELECT
                query,
                calls,
                total_exec_time AS total_time_ms,
                mean_exec_time AS mean_time_ms,
                rows,
                shared_blks_hit,
                shared_blks_read,
                CASE WHEN shared_blks_hit + shared_blks_read = 0
                     THEN 1.0
                     ELSE shared_blks_hit::float /
                          (shared_blks_hit + shared_blks_read)
                END AS cache_hit_ratio
            FROM pg_stat_statements
            WHERE mean_exec_time > $1
              AND calls > $2
              AND query NOT LIKE '%pg_stat%'
            ORDER BY total_exec_time DESC
            LIMIT $3
        """, min_mean_ms, min_calls, limit)

        return [SlowQuery(
            query=r["query"],
            calls=r["calls"],
            total_time_ms=r["total_time_ms"],
            mean_time_ms=r["mean_time_ms"],
            rows_returned=r["rows"],
            shared_blks_hit=r["shared_blks_hit"],
            shared_blks_read=r["shared_blks_read"],
            cache_hit_ratio=r["cache_hit_ratio"],
        ) for r in rows]

Index Recommendation Engine

The agent analyzes query plans and existing indexes to recommend new indexes or identify unused ones.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
import openai
import json

async def get_query_plan(pool: asyncpg.Pool, query: str) -> str:
    rows = await pool.fetch(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}")
    return json.dumps(rows[0]["QUERY PLAN"], indent=2)

async def get_existing_indexes(pool: asyncpg.Pool, table: str) -> list[dict]:
    rows = await pool.fetch("""
        SELECT
            indexname, indexdef,
            idx_scan, idx_tup_read, idx_tup_fetch,
            pg_size_pretty(pg_relation_size(indexrelid)) AS size
        FROM pg_stat_user_indexes
        JOIN pg_indexes ON indexname = pg_stat_user_indexes.indexrelname
        WHERE schemaname = 'public' AND tablename = $1
        ORDER BY idx_scan DESC
    """, table)
    return [dict(r) for r in rows]

async def recommend_indexes(
    pool: asyncpg.Pool, slow_query: SlowQuery
) -> dict:
    plan = await get_query_plan(pool, slow_query.query)

    # Extract table names from the query for index lookup
    tables = await pool.fetch("""
        SELECT DISTINCT tablename FROM pg_tables
        WHERE schemaname = 'public'
    """)
    table_names = [t["tablename"] for t in tables]

    mentioned_tables = [
        t for t in table_names if t in slow_query.query.lower()
    ]

    existing = {}
    for table in mentioned_tables:
        existing[table] = await get_existing_indexes(pool, table)

    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"""Analyze this slow PostgreSQL query and recommend indexes.

Query: {slow_query.query}
Mean execution time: {slow_query.mean_time_ms:.1f}ms
Total calls: {slow_query.calls}
Cache hit ratio: {slow_query.cache_hit_ratio:.3f}

Query plan:
{plan}

Existing indexes on mentioned tables:
{json.dumps(existing, indent=2, default=str)}

Return JSON with:
- recommended_indexes: list of CREATE INDEX statements
- unused_indexes: list of indexes with zero scans that could be dropped
- explanation: why these indexes help
- estimated_improvement: percentage speed improvement estimate"""
        }],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)

Vacuum Scheduling

The agent monitors dead tuple counts and schedules vacuum operations during low-traffic windows.

from datetime import datetime

@dataclass
class TableBloat:
    table_name: str
    live_tuples: int
    dead_tuples: int
    dead_ratio: float
    last_vacuum: Optional[datetime]
    last_autovacuum: Optional[datetime]
    table_size: str

async def check_vacuum_needs(pool: asyncpg.Pool) -> list[TableBloat]:
    rows = await pool.fetch("""
        SELECT
            relname AS table_name,
            n_live_tup AS live_tuples,
            n_dead_tup AS dead_tuples,
            CASE WHEN n_live_tup = 0 THEN 0
                 ELSE n_dead_tup::float / n_live_tup
            END AS dead_ratio,
            last_vacuum,
            last_autovacuum,
            pg_size_pretty(pg_total_relation_size(relid)) AS table_size
        FROM pg_stat_user_tables
        WHERE n_dead_tup > 1000
        ORDER BY n_dead_tup DESC
    """)
    return [TableBloat(**dict(r)) for r in rows]

async def schedule_vacuum(
    pool: asyncpg.Pool, table: str, is_low_traffic: bool
) -> str:
    if not is_low_traffic:
        return f"Skipping vacuum for {table}: not in low-traffic window"

    await pool.execute(f"VACUUM (VERBOSE, ANALYZE) {table}")
    return f"Vacuum completed for {table}"

def is_low_traffic_window() -> bool:
    hour = datetime.utcnow().hour
    return 2 <= hour <= 6  # UTC 2-6 AM

Capacity Planning

The agent tracks database growth trends and projects when you will hit capacity limits.

import numpy as np

async def analyze_growth_trend(
    pool: asyncpg.Pool, table: str, days_history: int = 90
) -> dict:
    """Analyze table growth and project when limits will be reached."""
    rows = await pool.fetch("""
        SELECT
            pg_total_relation_size($1) AS current_bytes,
            (SELECT setting::bigint * pg_size_bytes('1kB')
             FROM pg_settings WHERE name = 'max_wal_size') AS max_wal,
            (SELECT setting FROM pg_settings
             WHERE name = 'max_connections') AS max_conn
    """, table)

    current_size = rows[0]["current_bytes"]

    # Simulate historical growth (in production, store daily snapshots)
    daily_growth_rate = current_size * 0.02  # 2% daily growth estimate
    days_to_100gb = (100 * 1024**3 - current_size) / daily_growth_rate

    return {
        "table": table,
        "current_size_gb": current_size / (1024**3),
        "daily_growth_gb": daily_growth_rate / (1024**3),
        "days_to_100gb": max(0, int(days_to_100gb)),
        "recommendation": (
            "Consider partitioning" if days_to_100gb < 90
            else "Growth is manageable"
        ),
    }

FAQ

How do I safely apply index recommendations in production?

Always create indexes with CREATE INDEX CONCURRENTLY to avoid locking writes. The agent should generate the concurrent version of the DDL. Test the index on a replica first by running the slow query before and after index creation. Only apply to production after confirming improvement on the replica.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Should the agent run VACUUM FULL automatically?

Never. VACUUM FULL rewrites the entire table and takes an exclusive lock, blocking all reads and writes. The regular VACUUM (which the agent schedules) is safe for online operation. If VACUUM FULL is needed, the agent should flag it as a manual action requiring a maintenance window.

How does the agent handle parameterized queries in pg_stat_statements?

PostgreSQL normalizes queries in pg_stat_statements by replacing literal values with $1, $2, etc. The agent works with these normalized forms since that is what matters for index recommendations. When generating EXPLAIN plans, it substitutes reasonable sample values for the parameters.


#Database #PostgreSQL #QueryOptimization #DevOps #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.