Skip to content
Learn Agentic AI
Learn Agentic AI9 min read2 views

Memory-Efficient Agent Design: Handling Long Conversations Without OOM

Design AI agents that handle long conversations gracefully by using streaming processing, incremental state management, garbage collection strategies, and memory limits to prevent out-of-memory crashes.

How Agent Memory Grows Out of Control

An AI agent conversation is not just a list of strings. Each turn includes the user message, assistant response, tool calls, tool results, and metadata. A single tool result can be 10KB of JSON. Over a 50-turn conversation with 3-5 tool calls per turn, the in-memory conversation state can exceed 500KB — per session.

Multiply that by hundreds of concurrent sessions and you have a server consuming gigabytes of RAM just for conversation state. Add in embedding vectors, cached results, and intermediate processing buffers, and out-of-memory (OOM) crashes become a real production risk.

Streaming Processing: Never Hold the Full Response

When processing LLM responses, stream them instead of accumulating the entire response in memory before returning it.

sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
from openai import AsyncOpenAI

client = AsyncOpenAI()

# BAD: Accumulates the entire response in memory
async def generate_full(messages: list[dict]) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o", messages=messages,
    )
    return response.choices[0].message.content  # Full string in memory

# GOOD: Stream chunks to the client as they arrive
async def generate_streamed(messages: list[dict]):
    stream = await client.chat.completions.create(
        model="gpt-4o", messages=messages, stream=True,
    )
    async for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta  # Yield each chunk, never hold the full response

For FastAPI, combine this with StreamingResponse:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(request: ChatRequest):
    async def stream_generator():
        async for chunk in generate_streamed(request.messages):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        stream_generator(),
        media_type="text/event-stream",
    )

Incremental State: Store Summaries, Not Full History

Instead of keeping every message in memory, maintain an incremental state that compresses old messages into summaries.

from dataclasses import dataclass, field

@dataclass
class ConversationState:
    session_id: str
    summary: str = ""
    recent_messages: list[dict] = field(default_factory=list)
    max_recent: int = 10
    _total_turns: int = 0

    def add_message(self, message: dict):
        self.recent_messages.append(message)
        self._total_turns += 1

    def needs_compaction(self) -> bool:
        return len(self.recent_messages) > self.max_recent * 2

    async def compact(self, summarizer):
        """Compress old messages into the summary."""
        if not self.needs_compaction():
            return

        # Keep the last max_recent messages
        to_summarize = self.recent_messages[:-self.max_recent]
        self.recent_messages = self.recent_messages[-self.max_recent:]

        # Add to running summary
        new_summary = await summarizer.summarize(to_summarize)
        self.summary = f"{self.summary} {new_summary}".strip()

    def get_context(self) -> list[dict]:
        """Build the context for the LLM call."""
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"Previous conversation summary: {self.summary}",
            })
        context.extend(self.recent_messages)
        return context

    @property
    def memory_estimate_bytes(self) -> int:
        """Rough estimate of memory consumed by this state."""
        summary_bytes = len(self.summary.encode("utf-8"))
        messages_bytes = sum(
            len(str(m).encode("utf-8")) for m in self.recent_messages
        )
        return summary_bytes + messages_bytes

Session Memory Limits and Eviction

For multi-session servers, enforce per-session and global memory limits.

import asyncio
from collections import OrderedDict

class SessionManager:
    def __init__(
        self,
        max_sessions: int = 1000,
        max_memory_bytes: int = 500 * 1024 * 1024,  # 500MB
    ):
        self.max_sessions = max_sessions
        self.max_memory_bytes = max_memory_bytes
        self._sessions: OrderedDict[str, ConversationState] = OrderedDict()
        self._lock = asyncio.Lock()

    async def get_or_create(self, session_id: str) -> ConversationState:
        async with self._lock:
            if session_id in self._sessions:
                self._sessions.move_to_end(session_id)
                return self._sessions[session_id]

            # Evict if at capacity
            await self._evict_if_needed()

            state = ConversationState(session_id=session_id)
            self._sessions[session_id] = state
            return state

    async def _evict_if_needed(self):
        # Evict by count
        while len(self._sessions) >= self.max_sessions:
            evicted_id, evicted_state = self._sessions.popitem(last=False)
            await self._persist_to_disk(evicted_id, evicted_state)

        # Evict by memory
        total_memory = sum(
            s.memory_estimate_bytes for s in self._sessions.values()
        )
        while total_memory > self.max_memory_bytes and self._sessions:
            evicted_id, evicted_state = self._sessions.popitem(last=False)
            total_memory -= evicted_state.memory_estimate_bytes
            await self._persist_to_disk(evicted_id, evicted_state)

    async def _persist_to_disk(self, session_id: str, state: ConversationState):
        """Save evicted session to database for later retrieval."""
        # Implementation: write to PostgreSQL, Redis, or file
        pass

Truncating Tool Outputs Before Storage

Tool outputs are the single largest memory consumer. Truncate them before adding to conversation state.

import json

class ToolOutputTruncator:
    def __init__(self, max_chars: int = 2000):
        self.max_chars = max_chars

    def truncate(self, output: str) -> str:
        if len(output) <= self.max_chars:
            return output

        try:
            data = json.loads(output)
            return self._truncate_json(data)
        except (json.JSONDecodeError, TypeError):
            return output[:self.max_chars] + "\n...(truncated)"

    def _truncate_json(self, data, depth: int = 0) -> str:
        if depth > 3:
            return '"...(nested)"'

        if isinstance(data, list):
            if len(data) > 5:
                truncated = data[:5]
                result = json.dumps(truncated, default=str)
                return result + f"\n...({len(data) - 5} more items)"
            return json.dumps(data, default=str)

        if isinstance(data, dict):
            # Keep only essential fields
            essential = {k: v for k, v in list(data.items())[:10]}
            return json.dumps(essential, default=str)

        return json.dumps(data, default=str)

Monitoring Memory Usage

Add memory monitoring to detect leaks before they cause OOM crashes.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import psutil
import os
import logging

logger = logging.getLogger(__name__)

class MemoryMonitor:
    def __init__(self, warning_pct: float = 75.0, critical_pct: float = 90.0):
        self.warning_pct = warning_pct
        self.critical_pct = critical_pct
        self.process = psutil.Process(os.getpid())

    def check(self) -> dict:
        mem = self.process.memory_info()
        system_mem = psutil.virtual_memory()

        usage_pct = (mem.rss / system_mem.total) * 100

        status = {
            "rss_mb": mem.rss / (1024 * 1024),
            "usage_pct": usage_pct,
            "status": "ok",
        }

        if usage_pct > self.critical_pct:
            status["status"] = "critical"
            logger.critical(f"Memory critical: {usage_pct:.1f}% of system RAM")
        elif usage_pct > self.warning_pct:
            status["status"] = "warning"
            logger.warning(f"Memory warning: {usage_pct:.1f}% of system RAM")

        return status

FAQ

How many concurrent agent sessions can a typical server handle?

With efficient memory management, a server with 4GB of RAM can handle 1,000-5,000 concurrent sessions depending on conversation length. Without optimization, the same server might OOM at 200 sessions. The key is keeping per-session memory under 500KB through summarization and tool output truncation.

Should I use Redis or in-process memory for conversation state?

Use in-process memory for active sessions (fastest access) and Redis for idle sessions (shared across server instances). Implement an LRU eviction policy that moves inactive sessions from memory to Redis after a configurable idle timeout, typically 5-15 minutes.

How do I detect memory leaks in a long-running agent service?

Track RSS (Resident Set Size) over time using psutil. If RSS grows monotonically even when session counts are stable, you have a leak. Common culprits are: accumulating references in global lists, not closing HTTP clients, and circular references in tool result objects that prevent garbage collection.


#MemoryManagement #Streaming #Scalability #Production #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.

Agentic AI

Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026

How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.

Agentic AI

Token-Level Evaluation of Streaming Agents: TTFT, Stream Smoothness, and Mid-Stream Hallucination Detection

Streaming changes the eval game — final-answer correctness isn't enough when users perceive the answer one token at a time. Here's the metric set that matters.

Agentic AI

Smolagents: Hugging Face's Code-First Agent Framework Reviewed

Smolagents lets agents write Python instead of JSON. Why code-as-action reduces tool errors and where the security trade-offs are for production deployments.

AI Engineering

Vercel AI SDK 5: Tool Calling and Streaming Guide for React Apps

How to wire Vercel AI SDK 5 tool calls to a React UI with streaming, partial UI updates, and proper error handling that survives flaky network conditions.

AI Infrastructure

Ollama in 2026: Is It Production-Ready Now? An Honest Look

Ollama matured significantly through 2025-26 and added serious features. The honest take on whether it belongs in production for agent workloads, and where the limits sit.