Custom Session Implementations: The SessionABC Protocol
Build custom session backends for the OpenAI Agents SDK by implementing the SessionABC protocol with complete DynamoDB and MongoDB examples and testing strategies.
When Built-In Sessions Are Not Enough
The OpenAI Agents SDK ships with SQLite, Redis, and SQLAlchemy sessions. For most applications, one of these fits. But sometimes you need something different:
- Your organization standardizes on DynamoDB and does not run Redis or PostgreSQL
- You want to store sessions in MongoDB alongside your document-oriented data
- You need a session that integrates with a proprietary storage system
- You want to add custom middleware (logging, metrics, validation) at the session layer
The SDK defines a clear protocol — SessionABC — that any custom session must implement. Build to this interface and your custom session works with every feature of the SDK: runners, compaction, encryption wrappers, and multi-agent handoffs.
The SessionABC Interface
SessionABC is an abstract base class with four required methods:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
flowchart LR
INPUT(["User intent"])
PARSE["Parse plus<br/>classify"]
PLAN["Plan and tool<br/>selection"]
AGENT["Agent loop<br/>LLM plus tools"]
GUARD{"Guardrails<br/>and policy"}
EXEC["Execute and<br/>verify result"]
OBS[("Trace and metrics")]
OUT(["Outcome plus<br/>next action"])
INPUT --> PARSE --> PLAN --> AGENT --> GUARD
GUARD -->|Pass| EXEC --> OUT
GUARD -->|Fail| AGENT
AGENT --> OBS
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
from abc import ABC, abstractmethod
from typing import Any
class SessionABC(ABC):
@abstractmethod
async def get_items(self, session_id: str, limit: int | None = None) -> list[Any]:
"""Retrieve conversation items for a session."""
...
@abstractmethod
async def add_items(self, session_id: str, items: list[Any]) -> None:
"""Append new items to a session."""
...
@abstractmethod
async def pop_item(self, session_id: str) -> Any | None:
"""Remove and return the last item from a session."""
...
@abstractmethod
async def clear_session(self, session_id: str) -> None:
"""Delete all items for a session."""
...
Method Responsibilities
| Method | Purpose | Called By |
|---|---|---|
get_items() |
Load history before each run | Runner (pre-run) |
add_items() |
Persist new turns after each run | Runner (post-run) |
pop_item() |
Remove last item (undo/correction) | Manual or compaction |
clear_session() |
Wipe entire session | Manual cleanup |
Building a DynamoDB Session
Let us build a complete DynamoDB-backed session. DynamoDB is a natural fit for session data: it scales automatically, supports TTL, and is serverless.
Table Design
DynamoDB table: agent_sessions
- Partition key:
session_id(String) - Sort key:
item_order(Number) - TTL attribute:
expires_at(Number — Unix timestamp)
Implementation
import json
import time
from typing import Any
import boto3
from botocore.config import Config
from agents.extensions.sessions import SessionABC
class DynamoDBSession(SessionABC):
"""DynamoDB-backed session for the OpenAI Agents SDK."""
def __init__(
self,
table_name: str = "agent_sessions",
region: str = "us-east-1",
ttl_seconds: int | None = None,
):
config = Config(region_name=region)
self.dynamodb = boto3.resource("dynamodb", config=config)
self.table = self.dynamodb.Table(table_name)
self.ttl_seconds = ttl_seconds
async def get_items(
self, session_id: str, limit: int | None = None
) -> list[Any]:
"""Retrieve items from DynamoDB, ordered by item_order."""
kwargs = {
"KeyConditionExpression": "session_id = :sid",
"ExpressionAttributeValues": {":sid": session_id},
"ScanIndexForward": True, # Ascending order
}
if limit is not None:
kwargs["Limit"] = limit
response = self.table.query(**kwargs)
items = response.get("Items", [])
return [json.loads(item["item_data"]) for item in items]
async def add_items(self, session_id: str, items: list[Any]) -> None:
"""Append items to the session in DynamoDB."""
# Get current max item_order
response = self.table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ScanIndexForward=False,
Limit=1,
ProjectionExpression="item_order",
)
existing = response.get("Items", [])
next_order = (existing[0]["item_order"] + 1) if existing else 0
# Batch write new items
with self.table.batch_writer() as batch:
for i, item in enumerate(items):
record = {
"session_id": session_id,
"item_order": next_order + i,
"item_data": json.dumps(item),
"created_at": int(time.time()),
}
if self.ttl_seconds:
record["expires_at"] = int(time.time()) + self.ttl_seconds
batch.put_item(Item=record)
async def pop_item(self, session_id: str) -> Any | None:
"""Remove and return the last item."""
response = self.table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ScanIndexForward=False,
Limit=1,
)
items = response.get("Items", [])
if not items:
return None
last_item = items[0]
self.table.delete_item(
Key={
"session_id": session_id,
"item_order": last_item["item_order"],
}
)
return json.loads(last_item["item_data"])
async def clear_session(self, session_id: str) -> None:
"""Delete all items for a session."""
response = self.table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ProjectionExpression="session_id, item_order",
)
with self.table.batch_writer() as batch:
for item in response["Items"]:
batch.delete_item(
Key={
"session_id": item["session_id"],
"item_order": item["item_order"],
}
)
Using the DynamoDB Session
from agents import Agent, Runner
session = DynamoDBSession(
table_name="agent_sessions",
region="us-east-1",
ttl_seconds=60 * 60 * 24 * 7, # 7-day TTL
)
agent = Agent(name="DynamoAgent", instructions="You remember conversations.")
result = await Runner.run(
agent, "Hello!", session=session, session_id="user-abc-conv-1"
)
Building a MongoDB Session
MongoDB is another popular choice, especially for teams already using it as their primary datastore.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
import json
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from agents.extensions.sessions import SessionABC
class MongoDBSession(SessionABC):
"""MongoDB-backed session using Motor async driver."""
def __init__(self, mongo_url: str, database: str = "agents", collection: str = "sessions"):
self.client = AsyncIOMotorClient(mongo_url)
self.collection = self.client[database][collection]
async def initialize(self):
"""Create indexes for efficient queries."""
await self.collection.create_index(
[("session_id", 1), ("item_order", 1)],
unique=True,
)
await self.collection.create_index("session_id")
async def get_items(
self, session_id: str, limit: int | None = None
) -> list[Any]:
cursor = self.collection.find(
{"session_id": session_id},
sort=[("item_order", 1)],
)
if limit is not None:
cursor = cursor.limit(limit)
items = []
async for doc in cursor:
items.append(doc["item_data"])
return items
async def add_items(self, session_id: str, items: list[Any]) -> None:
# Get current max order
last_doc = await self.collection.find_one(
{"session_id": session_id},
sort=[("item_order", -1)],
projection={"item_order": 1},
)
next_order = (last_doc["item_order"] + 1) if last_doc else 0
documents = [
{
"session_id": session_id,
"item_order": next_order + i,
"item_data": item,
"created_at": time.time(),
}
for i, item in enumerate(items)
]
if documents:
await self.collection.insert_many(documents)
async def pop_item(self, session_id: str) -> Any | None:
last_doc = await self.collection.find_one_and_delete(
{"session_id": session_id},
sort=[("item_order", -1)],
)
if last_doc:
return last_doc["item_data"]
return None
async def clear_session(self, session_id: str) -> None:
await self.collection.delete_many({"session_id": session_id})
Using the MongoDB Session
import asyncio
from agents import Agent, Runner
async def main():
session = MongoDBSession("mongodb://localhost:27017")
await session.initialize()
agent = Agent(name="MongoAgent", instructions="You are helpful.")
result = await Runner.run(
agent, "Remember: my API key rotates on Fridays.",
session=session, session_id="ops-team-conv"
)
print(result.final_output)
asyncio.run(main())
Testing Custom Sessions
Every custom session should pass a standard test suite that validates the SessionABC contract:
import pytest
import asyncio
async def session_contract_tests(session):
"""Test suite that validates any SessionABC implementation."""
sid = "test-session-001"
# Start clean
await session.clear_session(sid)
# Test empty session
items = await session.get_items(sid)
assert items == [], "Empty session should return empty list"
# Test add and retrieve
await session.add_items(sid, [{"role": "user", "content": "Hello"}])
items = await session.get_items(sid)
assert len(items) == 1
assert items[0]["content"] == "Hello"
# Test multiple adds
await session.add_items(sid, [
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "How are you?"},
])
items = await session.get_items(sid)
assert len(items) == 3
# Test ordering
assert items[0]["content"] == "Hello"
assert items[1]["content"] == "Hi there"
assert items[2]["content"] == "How are you?"
# Test limit
items = await session.get_items(sid, limit=2)
assert len(items) == 2
# Test pop_item
popped = await session.pop_item(sid)
assert popped["content"] == "How are you?"
items = await session.get_items(sid)
assert len(items) == 2
# Test pop from empty
await session.clear_session(sid)
popped = await session.pop_item(sid)
assert popped is None
# Test session isolation
await session.add_items("session-a", [{"data": "a"}])
await session.add_items("session-b", [{"data": "b"}])
assert (await session.get_items("session-a"))[0]["data"] == "a"
assert (await session.get_items("session-b"))[0]["data"] == "b"
# Cleanup
await session.clear_session("session-a")
await session.clear_session("session-b")
print("All contract tests passed!")
# Run against your implementation
# asyncio.run(session_contract_tests(DynamoDBSession()))
# asyncio.run(session_contract_tests(MongoDBSession("mongodb://localhost")))
Building to the SessionABC protocol means your custom session integrates seamlessly with all SDK features — compaction, encryption, session sharing, and multi-agent handoffs work out of the box.
Sources:
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.