Streaming Responses

Streaming allows you to receive model output in real-time as it's generated, rather than waiting for the complete response.

When to Use Streaming

  • Long-form content: Articles, stories, explanations
  • Interactive applications: Chatbots, assistants
  • Better UX: Users see immediate feedback
  • Large responses: Reduces perceived latency

Basic Streaming Example

import requests
import json

API_URL = "http://localhost:8000/api/v1"
API_KEY = "sk_your_api_key"

def stream_generate(service: str, model: str, prompt: str):
    """Stream a text generation response."""
    response = requests.post(
        f"{API_URL}/generate/stream",
        headers={
            "Content-Type": "application/json",
            "X-API-Key": API_KEY,
            "Accept": "text/event-stream"
        },
        json={
            "service_name": service,
            "model_name": model,
            "input": [
                {"type": "text", "name": "input", "content": prompt}
            ]
        },
        stream=True
    )

    for line in response.iter_lines():
        if line:
            line = line.decode("utf-8")
            if line.startswith("data: "):
                yield json.loads(line[6:])


# Usage
for chunk in stream_generate(
    service="google-gemini",
    model="gemini-2.0-flash",
    prompt="Write a short story about a robot learning to paint."
):
    if chunk.get("final"):
        print(f"\n\n--- Complete ---")
        print(f"Tokens: {chunk['usage']['total_tokens']}")
        print(f"Cost: ${chunk['cost']['total']:.4f}")
    else:
        # Print content as it arrives
        if chunk.get("name") == "output":
            print(chunk.get("content", ""), end="", flush=True)

Streaming Chunk Structure

Each chunk from the SSE stream contains:

{
  "type": "text",
  "name": "output",
  "content": "The robot ",
  "done": false,
  "index": 0,
  "is_final": false
}
Field Description
type Content type: text, image, metadata
name Output name: output, thinking, metadata
content Incremental content to append
done This specific item is complete
index Position in output array
is_final True for the very last chunk

Final Chunk

The last chunk includes usage and cost:

{
  "final": true,
  "usage": {
    "input_tokens": 24,
    "output_tokens": 512,
    "total_tokens": 536
  },
  "cost": {
    "input": 0.0000018,
    "output": 0.0001536,
    "total": 0.0001554
  },
  "warnings": []
}

Complete Streaming Client

class StreamingClient:
    """Client with streaming support."""

    def __init__(self, api_key: str, base_url: str = "http://localhost:8000/api/v1"):
        self.api_key = api_key
        self.base_url = base_url

    def stream(
        self,
        service: str,
        model: str,
        inputs: list,
        parameters: dict = None,
        history: list = None,
        on_chunk: callable = None,
        on_thinking: callable = None,
        on_complete: callable = None
    ):
        """
        Stream a generation with callbacks.

        Args:
            service: Service name (e.g., "google-gemini")
            model: Model name (e.g., "gemini-2.0-flash")
            inputs: List of input items
            parameters: Generation parameters
            history: Conversation history
            on_chunk: Callback for each output chunk
            on_thinking: Callback for thinking/reasoning chunks
            on_complete: Callback when generation completes
        """
        payload = {
            "service_name": service,
            "model_name": model,
            "input": inputs
        }
        if parameters:
            payload["parameters"] = parameters
        if history:
            payload["history"] = history

        response = requests.post(
            f"{self.base_url}/generate/stream",
            headers={
                "Content-Type": "application/json",
                "X-API-Key": self.api_key,
                "Accept": "text/event-stream"
            },
            json=payload,
            stream=True
        )

        output_buffer = {}
        final_result = None

        for line in response.iter_lines():
            if not line:
                continue

            line = line.decode("utf-8")
            if not line.startswith("data: "):
                continue

            chunk = json.loads(line[6:])

            if chunk.get("final"):
                final_result = chunk
                if on_complete:
                    on_complete(chunk)
                break

            # Accumulate content by index
            index = chunk.get("index", 0)
            if index not in output_buffer:
                output_buffer[index] = {
                    "type": chunk["type"],
                    "name": chunk["name"],
                    "content": ""
                }
            output_buffer[index]["content"] += chunk.get("content", "")

            # Call appropriate callback
            if chunk.get("name") == "thinking" and on_thinking:
                on_thinking(chunk.get("content", ""))
            elif chunk.get("name") == "output" and on_chunk:
                on_chunk(chunk.get("content", ""))

        return {
            "output": list(output_buffer.values()),
            "usage": final_result.get("usage") if final_result else None,
            "cost": final_result.get("cost") if final_result else None,
            "warnings": final_result.get("warnings", [])
        }


# Usage with callbacks
client = StreamingClient(api_key="sk_your_api_key")

def handle_chunk(text):
    print(text, end="", flush=True)

def handle_thinking(text):
    print(f"[thinking] {text}", end="", flush=True)

def handle_complete(result):
    print(f"\n\nComplete! Cost: ${result['cost']['total']:.4f}")

result = client.stream(
    service="google-gemini",
    model="gemini-2.0-flash",
    inputs=[{"type": "text", "name": "input", "content": "Explain recursion."}],
    on_chunk=handle_chunk,
    on_complete=handle_complete
)

Streaming with Extended Thinking

Some models support "thinking" or reasoning that's streamed separately:

result = client.stream(
    service="google-gemini",
    model="gemini-3-pro-thinking-preview",
    inputs=[{"type": "text", "name": "input", "content": "Solve this math problem: ..."}],
    parameters={"enable_thinking": True},
    on_thinking=lambda t: print(f"🤔 {t}", end="", flush=True),
    on_chunk=lambda c: print(f"📝 {c}", end="", flush=True)
)

Async Streaming (aiohttp)

For async applications:

import aiohttp
import asyncio

async def async_stream(service: str, model: str, prompt: str):
    """Async streaming with aiohttp."""
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{API_URL}/generate/stream",
            headers={
                "Content-Type": "application/json",
                "X-API-Key": API_KEY,
                "Accept": "text/event-stream"
            },
            json={
                "service_name": service,
                "model_name": model,
                "input": [{"type": "text", "name": "input", "content": prompt}]
            }
        ) as response:
            async for line in response.content:
                line = line.decode("utf-8").strip()
                if line.startswith("data: "):
                    chunk = json.loads(line[6:])
                    yield chunk


# Usage
async def main():
    full_text = ""
    async for chunk in async_stream(
        "google-gemini",
        "gemini-2.0-flash",
        "Write a haiku about programming."
    ):
        if chunk.get("final"):
            print(f"\n\nCost: ${chunk['cost']['total']:.4f}")
        elif chunk.get("name") == "output":
            content = chunk.get("content", "")
            full_text += content
            print(content, end="", flush=True)

asyncio.run(main())

Handling Stream Interruptions

def robust_stream(service: str, model: str, prompt: str, max_retries: int = 3):
    """Stream with retry logic for interruptions."""
    accumulated = ""
    retries = 0

    while retries < max_retries:
        try:
            response = requests.post(
                f"{API_URL}/generate/stream",
                headers={
                    "Content-Type": "application/json",
                    "X-API-Key": API_KEY,
                    "Accept": "text/event-stream"
                },
                json={
                    "service_name": service,
                    "model_name": model,
                    "input": [{"type": "text", "name": "input", "content": prompt}]
                },
                stream=True,
                timeout=60
            )

            for line in response.iter_lines():
                if line:
                    line = line.decode("utf-8")
                    if line.startswith("data: "):
                        chunk = json.loads(line[6:])
                        if chunk.get("final"):
                            return {
                                "content": accumulated,
                                "usage": chunk["usage"],
                                "cost": chunk["cost"]
                            }
                        elif chunk.get("name") == "output":
                            content = chunk.get("content", "")
                            accumulated += content
                            yield content

        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            retries += 1
            if retries >= max_retries:
                raise
            print(f"\nConnection interrupted, retrying ({retries}/{max_retries})...")
            continue

    return {"content": accumulated, "partial": True}

Stream to File

def stream_to_file(service: str, model: str, prompt: str, output_path: str):
    """Stream response directly to a file."""
    with open(output_path, "w") as f:
        for chunk in stream_generate(service, model, prompt):
            if chunk.get("final"):
                f.write(f"\n\n---\nTokens: {chunk['usage']['total_tokens']}\n")
                f.write(f"Cost: ${chunk['cost']['total']:.4f}\n")
            elif chunk.get("name") == "output":
                content = chunk.get("content", "")
                f.write(content)
                f.flush()  # Ensure immediate write

stream_to_file(
    "google-gemini",
    "gemini-2.0-flash",
    "Write a detailed guide to machine learning.",
    "ml_guide.md"
)

Notes

  • Streaming is not supported for image generation (gemini_image models)
  • For image models, use the regular /generate endpoint
  • Stream responses may include rate limit warnings in the final chunk
  • Always handle the final chunk to get accurate usage/cost data

Next Steps