Skip to content
Learn Agentic AI
Learn Agentic AI11 min read5 views

Python Generators and Iterators for Streaming AI Responses

Master Python generators and async iterators for building efficient streaming AI response pipelines with memory-efficient processing, backpressure handling, and real-time output.

Why Streaming Matters for AI Applications

When an LLM generates a 2,000-token response, waiting for the full completion before showing anything creates a poor user experience. Streaming sends tokens as they are generated, reducing perceived latency from seconds to milliseconds. Python generators are the natural abstraction for this pattern — they produce values lazily, one at a time, without holding the entire response in memory.

Beyond user experience, generators enable memory-efficient processing of large datasets for embeddings, batch inference, and document chunking — all critical operations in AI pipelines.

Generator Basics for Token Streaming

A generator function uses yield instead of return. Each yield pauses the function and produces a value. The function resumes from where it paused on the next iteration.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
from typing import Generator

def stream_tokens(text: str, chunk_size: int = 4) -> Generator[str, None, None]:
    """Simulate token streaming by yielding chunks of text."""
    for i in range(0, len(text), chunk_size):
        yield text[i:i + chunk_size]

# Lazy evaluation - only one chunk in memory at a time
for token in stream_tokens("The agent analyzed the document carefully."):
    print(token, end="", flush=True)

Async Generators for Real LLM Streaming

Real LLM APIs stream over HTTP using server-sent events. Async generators handle this naturally.

import httpx
import json
from typing import AsyncGenerator

async def stream_chat_completion(
    messages: list[dict],
    model: str = "gpt-4o",
    api_key: str = "",
) -> AsyncGenerator[str, None]:
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json={"model": model, "messages": messages, "stream": True},
            timeout=60.0,
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: ") and line != "data: [DONE]":
                    chunk = json.loads(line[6:])
                    delta = chunk["choices"][0].get("delta", {})
                    if content := delta.get("content"):
                        yield content

# Usage
async def main():
    messages = [{"role": "user", "content": "Explain agentic AI"}]
    full_response = []
    async for token in stream_chat_completion(messages, api_key="sk-..."):
        print(token, end="", flush=True)
        full_response.append(token)

    complete = "".join(full_response)

Pipeline Composition with Generators

Generators compose into processing pipelines where each stage transforms the stream without buffering everything.

from typing import AsyncGenerator

async def chunk_by_sentence(
    token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[str, None]:
    buffer = ""
    async for token in token_stream:
        buffer += token
        while ". " in buffer:
            sentence, buffer = buffer.split(". ", 1)
            yield sentence.strip() + "."

async def add_citations(
    sentence_stream: AsyncGenerator[str, None],
    knowledge_base: dict,
) -> AsyncGenerator[str, None]:
    async for sentence in sentence_stream:
        # Check if sentence needs a citation
        for keyword, citation in knowledge_base.items():
            if keyword.lower() in sentence.lower():
                sentence += f" [{citation}]"
                break
        yield sentence

# Compose the pipeline
async def enriched_stream(messages, api_key, kb):
    raw_tokens = stream_chat_completion(messages, api_key=api_key)
    sentences = chunk_by_sentence(raw_tokens)
    enriched = add_citations(sentences, kb)
    async for sentence in enriched:
        yield sentence

Memory-Efficient Batch Processing

When generating embeddings for thousands of documents, generators prevent loading everything into memory at once.

from typing import Generator
from pathlib import Path

def read_documents(directory: Path) -> Generator[str, None, None]:
    for file_path in directory.glob("*.txt"):
        yield file_path.read_text()

def batch_items(items: Generator, batch_size: int = 32) -> Generator[list, None, None]:
    batch = []
    for item in items:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

async def embed_directory(directory: Path):
    documents = read_documents(directory)
    for batch in batch_items(documents, batch_size=32):
        embeddings = await embedding_api.embed(batch)
        await vector_db.upsert(embeddings)
        # Only 32 documents in memory at any time

Generator-Based Streaming in FastAPI

FastAPI supports StreamingResponse with generators for real-time AI output.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        async for token in stream_chat_completion(
            request.messages, api_key=settings.api_key
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

FAQ

What is the difference between a generator and an iterator in Python?

Every generator is an iterator, but not every iterator is a generator. An iterator is any object with __iter__ and __next__ methods. A generator is a function that uses yield and automatically creates an iterator. Generators are more concise and handle state management automatically.

How do I handle errors in a streaming pipeline without losing partial results?

Wrap the consumer loop in a try/except and process whatever has been yielded so far. You can also use generator.throw() to inject exceptions into a running generator from the outside, allowing it to handle errors and potentially recover or yield a fallback value.

Can generators cause memory leaks if not fully consumed?

Yes. If you break out of a generator loop early, the generator object stays in memory with its full stack frame until garbage collected. Call generator.close() explicitly or use it inside a with statement via contextlib.closing to ensure cleanup.


#Python #Generators #Streaming #AsyncProgramming #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like