Skip to content
Learn Agentic AI
Learn Agentic AI10 min read11 views

Custom Session Implementations: The SessionABC Protocol

Build custom session backends for the OpenAI Agents SDK by implementing the SessionABC protocol with complete DynamoDB and MongoDB examples and testing strategies.

When Built-In Sessions Are Not Enough

The OpenAI Agents SDK ships with SQLite, Redis, and SQLAlchemy sessions. For most applications, one of these fits. But sometimes you need something different:

  • Your organization standardizes on DynamoDB and does not run Redis or PostgreSQL
  • You want to store sessions in MongoDB alongside your document-oriented data
  • You need a session that integrates with a proprietary storage system
  • You want to add custom middleware (logging, metrics, validation) at the session layer

The SDK defines a clear protocol — SessionABC — that any custom session must implement. Build to this interface and your custom session works with every feature of the SDK: runners, compaction, encryption wrappers, and multi-agent handoffs.

The SessionABC Interface

SessionABC is an abstract base class with four required methods:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
from abc import ABC, abstractmethod
from typing import Any

class SessionABC(ABC):
    @abstractmethod
    async def get_items(self, session_id: str, limit: int | None = None) -> list[Any]:
        """Retrieve conversation items for a session."""
        ...

    @abstractmethod
    async def add_items(self, session_id: str, items: list[Any]) -> None:
        """Append new items to a session."""
        ...

    @abstractmethod
    async def pop_item(self, session_id: str) -> Any | None:
        """Remove and return the last item from a session."""
        ...

    @abstractmethod
    async def clear_session(self, session_id: str) -> None:
        """Delete all items for a session."""
        ...

Method Responsibilities

Method Purpose Called By
get_items() Load history before each run Runner (pre-run)
add_items() Persist new turns after each run Runner (post-run)
pop_item() Remove last item (undo/correction) Manual or compaction
clear_session() Wipe entire session Manual cleanup

Building a DynamoDB Session

Let us build a complete DynamoDB-backed session. DynamoDB is a natural fit for session data: it scales automatically, supports TTL, and is serverless.

Table Design

DynamoDB table: agent_sessions

  • Partition key: session_id (String)
  • Sort key: item_order (Number)
  • TTL attribute: expires_at (Number — Unix timestamp)

Implementation

import json
import time
from typing import Any
import boto3
from botocore.config import Config
from agents.extensions.sessions import SessionABC

class DynamoDBSession(SessionABC):
    """DynamoDB-backed session for the OpenAI Agents SDK."""

    def __init__(
        self,
        table_name: str = "agent_sessions",
        region: str = "us-east-1",
        ttl_seconds: int | None = None,
    ):
        config = Config(region_name=region)
        self.dynamodb = boto3.resource("dynamodb", config=config)
        self.table = self.dynamodb.Table(table_name)
        self.ttl_seconds = ttl_seconds

    async def get_items(
        self, session_id: str, limit: int | None = None
    ) -> list[Any]:
        """Retrieve items from DynamoDB, ordered by item_order."""
        kwargs = {
            "KeyConditionExpression": "session_id = :sid",
            "ExpressionAttributeValues": {":sid": session_id},
            "ScanIndexForward": True,  # Ascending order
        }

        if limit is not None:
            kwargs["Limit"] = limit

        response = self.table.query(**kwargs)
        items = response.get("Items", [])

        return [json.loads(item["item_data"]) for item in items]

    async def add_items(self, session_id: str, items: list[Any]) -> None:
        """Append items to the session in DynamoDB."""
        # Get current max item_order
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=1,
            ProjectionExpression="item_order",
        )

        existing = response.get("Items", [])
        next_order = (existing[0]["item_order"] + 1) if existing else 0

        # Batch write new items
        with self.table.batch_writer() as batch:
            for i, item in enumerate(items):
                record = {
                    "session_id": session_id,
                    "item_order": next_order + i,
                    "item_data": json.dumps(item),
                    "created_at": int(time.time()),
                }

                if self.ttl_seconds:
                    record["expires_at"] = int(time.time()) + self.ttl_seconds

                batch.put_item(Item=record)

    async def pop_item(self, session_id: str) -> Any | None:
        """Remove and return the last item."""
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=1,
        )

        items = response.get("Items", [])
        if not items:
            return None

        last_item = items[0]
        self.table.delete_item(
            Key={
                "session_id": session_id,
                "item_order": last_item["item_order"],
            }
        )

        return json.loads(last_item["item_data"])

    async def clear_session(self, session_id: str) -> None:
        """Delete all items for a session."""
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ProjectionExpression="session_id, item_order",
        )

        with self.table.batch_writer() as batch:
            for item in response["Items"]:
                batch.delete_item(
                    Key={
                        "session_id": item["session_id"],
                        "item_order": item["item_order"],
                    }
                )

Using the DynamoDB Session

from agents import Agent, Runner

session = DynamoDBSession(
    table_name="agent_sessions",
    region="us-east-1",
    ttl_seconds=60 * 60 * 24 * 7,  # 7-day TTL
)

agent = Agent(name="DynamoAgent", instructions="You remember conversations.")

result = await Runner.run(
    agent, "Hello!", session=session, session_id="user-abc-conv-1"
)

Building a MongoDB Session

MongoDB is another popular choice, especially for teams already using it as their primary datastore.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import json
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from agents.extensions.sessions import SessionABC

class MongoDBSession(SessionABC):
    """MongoDB-backed session using Motor async driver."""

    def __init__(self, mongo_url: str, database: str = "agents", collection: str = "sessions"):
        self.client = AsyncIOMotorClient(mongo_url)
        self.collection = self.client[database][collection]

    async def initialize(self):
        """Create indexes for efficient queries."""
        await self.collection.create_index(
            [("session_id", 1), ("item_order", 1)],
            unique=True,
        )
        await self.collection.create_index("session_id")

    async def get_items(
        self, session_id: str, limit: int | None = None
    ) -> list[Any]:
        cursor = self.collection.find(
            {"session_id": session_id},
            sort=[("item_order", 1)],
        )

        if limit is not None:
            cursor = cursor.limit(limit)

        items = []
        async for doc in cursor:
            items.append(doc["item_data"])

        return items

    async def add_items(self, session_id: str, items: list[Any]) -> None:
        # Get current max order
        last_doc = await self.collection.find_one(
            {"session_id": session_id},
            sort=[("item_order", -1)],
            projection={"item_order": 1},
        )

        next_order = (last_doc["item_order"] + 1) if last_doc else 0

        documents = [
            {
                "session_id": session_id,
                "item_order": next_order + i,
                "item_data": item,
                "created_at": time.time(),
            }
            for i, item in enumerate(items)
        ]

        if documents:
            await self.collection.insert_many(documents)

    async def pop_item(self, session_id: str) -> Any | None:
        last_doc = await self.collection.find_one_and_delete(
            {"session_id": session_id},
            sort=[("item_order", -1)],
        )

        if last_doc:
            return last_doc["item_data"]
        return None

    async def clear_session(self, session_id: str) -> None:
        await self.collection.delete_many({"session_id": session_id})

Using the MongoDB Session

import asyncio
from agents import Agent, Runner

async def main():
    session = MongoDBSession("mongodb://localhost:27017")
    await session.initialize()

    agent = Agent(name="MongoAgent", instructions="You are helpful.")

    result = await Runner.run(
        agent, "Remember: my API key rotates on Fridays.",
        session=session, session_id="ops-team-conv"
    )
    print(result.final_output)

asyncio.run(main())

Testing Custom Sessions

Every custom session should pass a standard test suite that validates the SessionABC contract:

import pytest
import asyncio

async def session_contract_tests(session):
    """Test suite that validates any SessionABC implementation."""
    sid = "test-session-001"

    # Start clean
    await session.clear_session(sid)

    # Test empty session
    items = await session.get_items(sid)
    assert items == [], "Empty session should return empty list"

    # Test add and retrieve
    await session.add_items(sid, [{"role": "user", "content": "Hello"}])
    items = await session.get_items(sid)
    assert len(items) == 1
    assert items[0]["content"] == "Hello"

    # Test multiple adds
    await session.add_items(sid, [
        {"role": "assistant", "content": "Hi there"},
        {"role": "user", "content": "How are you?"},
    ])
    items = await session.get_items(sid)
    assert len(items) == 3

    # Test ordering
    assert items[0]["content"] == "Hello"
    assert items[1]["content"] == "Hi there"
    assert items[2]["content"] == "How are you?"

    # Test limit
    items = await session.get_items(sid, limit=2)
    assert len(items) == 2

    # Test pop_item
    popped = await session.pop_item(sid)
    assert popped["content"] == "How are you?"
    items = await session.get_items(sid)
    assert len(items) == 2

    # Test pop from empty
    await session.clear_session(sid)
    popped = await session.pop_item(sid)
    assert popped is None

    # Test session isolation
    await session.add_items("session-a", [{"data": "a"}])
    await session.add_items("session-b", [{"data": "b"}])
    assert (await session.get_items("session-a"))[0]["data"] == "a"
    assert (await session.get_items("session-b"))[0]["data"] == "b"

    # Cleanup
    await session.clear_session("session-a")
    await session.clear_session("session-b")

    print("All contract tests passed!")

# Run against your implementation
# asyncio.run(session_contract_tests(DynamoDBSession()))
# asyncio.run(session_contract_tests(MongoDBSession("mongodb://localhost")))

Building to the SessionABC protocol means your custom session integrates seamlessly with all SDK features — compaction, encryption, session sharing, and multi-agent handoffs work out of the box.

Sources:

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like