Skip to content
Learn Agentic AI
Learn Agentic AI13 min read7 views

AI Agent for Log Analysis: Automated Error Detection and Root Cause Analysis

Build an AI agent that parses application logs, detects error patterns, identifies anomalies, correlates events across services, and generates root cause analysis reports automatically.

The Log Analysis Challenge

Modern applications generate enormous volumes of logs across multiple services. When an incident occurs, engineers spend most of their time searching through logs, correlating timestamps, and piecing together what happened. An AI log analysis agent automates this process: it ingests logs, detects anomalies, clusters related errors, and produces a root cause analysis that points engineers to the exact sequence of events that triggered the problem.

Structured Log Parsing

The first step is converting raw log lines into structured data the agent can reason about.

flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK<br/>GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces<br/>Tempo or Honeycomb")]
        MET[("Metrics<br/>Prometheus")]
        LOG[("Logs<br/>Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
import re
from datetime import datetime
from dataclasses import dataclass, field
from collections import Counter
from openai import OpenAI

client = OpenAI()

@dataclass
class LogEntry:
    timestamp: datetime
    level: str
    service: str
    message: str
    raw: str
    metadata: dict = field(default_factory=dict)

class LogParser:
    PATTERNS = [
        re.compile(
            r"(?P<timestamp>[\d-]+ [\d:,.]+)\s+"
            r"(?P<level>DEBUG|INFO|WARNING|ERROR|CRITICAL)\s+"
            r"\[(?P<service>[^\]]+)\]\s+"
            r"(?P<message>.+)"
        ),
        re.compile(
            r"(?P<timestamp>[\d-]+T[\d:.]+Z?)\s+"
            r"(?P<level>\w+)\s+"
            r"(?P<service>[\w.-]+):\s+"
            r"(?P<message>.+)"
        ),
    ]

    def parse(self, raw_logs: str) -> list[LogEntry]:
        entries = []
        for line in raw_logs.strip().split("\n"):
            if not line.strip():
                continue
            entry = self._parse_line(line)
            if entry:
                entries.append(entry)
        entries.sort(key=lambda e: e.timestamp)
        return entries

    def _parse_line(self, line: str) -> LogEntry | None:
        for pattern in self.PATTERNS:
            match = pattern.match(line)
            if match:
                groups = match.groupdict()
                ts_str = groups["timestamp"]
                for fmt in [
                    "%Y-%m-%d %H:%M:%S,%f",
                    "%Y-%m-%dT%H:%M:%S.%fZ",
                    "%Y-%m-%d %H:%M:%S",
                ]:
                    try:
                        ts = datetime.strptime(ts_str, fmt)
                        break
                    except ValueError:
                        continue
                else:
                    ts = datetime.now()
                return LogEntry(
                    timestamp=ts, level=groups["level"],
                    service=groups["service"],
                    message=groups["message"], raw=line,
                )
        return None

Error Pattern Detection

The agent groups errors by message pattern and identifies the most common failures.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
class LogAnalysisAgent:
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.parser = LogParser()

    def detect_error_patterns(
        self, entries: list[LogEntry]
    ) -> list[dict]:
        errors = [e for e in entries if e.level in ("ERROR", "CRITICAL")]
        if not errors:
            return []

        normalized = []
        for entry in errors:
            msg = re.sub(r"\b[0-9a-f-]{36}\b", "<UUID>", entry.message)
            msg = re.sub(r"\b\d+\b", "<NUM>", msg)
            msg = re.sub(r"'[^']*'", "'<STR>'", msg)
            normalized.append(msg)

        counter = Counter(normalized)
        patterns = []
        for pattern, count in counter.most_common(20):
            examples = [
                e for e in errors
                if self._matches_pattern(e.message, pattern)
            ][:3]
            patterns.append({
                "pattern": pattern,
                "count": count,
                "first_seen": min(e.timestamp for e in examples).isoformat(),
                "last_seen": max(e.timestamp for e in examples).isoformat(),
                "services": list(set(e.service for e in examples)),
                "examples": [e.raw for e in examples],
            })
        return patterns

The normalization step replaces UUIDs, numbers, and string literals with placeholders, allowing the agent to group errors that differ only in their specific values.

Root Cause Analysis with the LLM

With structured error patterns, the agent uses the LLM to perform root cause analysis.

import json

def analyze_root_cause(self, raw_logs: str) -> dict:
    entries = self.parser.parse(raw_logs)
    patterns = self.detect_error_patterns(entries)
    timeline = self._build_timeline(entries)

    response = client.chat.completions.create(
        model=self.model,
        messages=[
            {"role": "system", "content": """You are a senior SRE
performing incident root cause analysis. Given error patterns
and a timeline, determine:
1. The primary root cause
2. The chain of events that led to the failure
3. Which services were affected and in what order
4. Recommended immediate actions
5. Long-term fixes to prevent recurrence

Return JSON with:
- "root_cause": one-sentence summary
- "event_chain": ordered list of events
- "affected_services": list with impact description
- "immediate_actions": list of steps to take now
- "prevention": list of long-term improvements
- "severity": "critical", "high", "medium", or "low"
"""},
            {"role": "user", "content": (
                f"Error patterns:\n{json.dumps(patterns, indent=2)}\n\n"
                f"Event timeline:\n{timeline}"
            )},
        ],
        temperature=0,
        response_format={"type": "json_object"},
    )

    return json.loads(response.choices[0].message.content)

def _build_timeline(self, entries: list[LogEntry]) -> str:
    significant = [
        e for e in entries
        if e.level in ("ERROR", "CRITICAL", "WARNING")
    ]
    lines = []
    for entry in significant[:100]:
        lines.append(
            f"[{entry.timestamp.isoformat()}] "
            f"{entry.level} [{entry.service}] {entry.message}"
        )
    return "\n".join(lines)

Generating an Incident Report

def generate_report(self, raw_logs: str) -> str:
    analysis = self.analyze_root_cause(raw_logs)
    entries = self.parser.parse(raw_logs)
    total = len(entries)
    errors = len([e for e in entries if e.level in ("ERROR", "CRITICAL")])

    report = f"""# Incident Analysis Report

## Summary
**Root Cause:** {analysis['root_cause']}
**Severity:** {analysis['severity'].upper()}
**Total log entries analyzed:** {total}
**Error entries:** {errors}

## Event Chain
"""
    for i, event in enumerate(analysis["event_chain"], 1):
        report += f"{i}. {event}\n"

    report += "\n## Affected Services\n"
    for svc in analysis["affected_services"]:
        report += f"- {svc}\n"

    report += "\n## Immediate Actions\n"
    for action in analysis["immediate_actions"]:
        report += f"- [ ] {action}\n"

    return report

FAQ

How does the agent handle logs from multiple services with different formats?

The parser supports multiple format patterns and tries each one in order. For services with completely custom formats, you add a new regex pattern to the PATTERNS list. The service name extracted from each log line allows the agent to correlate events across services by timestamp.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Can the agent process logs in real time?

Yes, by wrapping the parser in a streaming consumer that reads from a log aggregation system like Kafka or a file tail. Buffer entries for a time window (for example 60 seconds), run pattern detection on each window, and trigger a full root cause analysis when error rates spike above a threshold.

How do I handle log volumes that exceed the LLM context window?

Summarize before sending to the LLM. The pattern detection step reduces thousands of error lines to a handful of patterns with counts. The timeline is limited to the most recent 100 significant events. For very large volumes, add a pre-filtering step that focuses on the time window around the incident.


#LogAnalysis #AIAgents #Python #Observability #DevOps #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.