Streaming Responses
Streaming allows you to receive model output in real-time as it's generated, rather than waiting for the complete response.
When to Use Streaming
- Long-form content: Articles, stories, explanations
- Interactive applications: Chatbots, assistants
- Better UX: Users see immediate feedback
- Large responses: Reduces perceived latency
Basic Streaming Example
import requests
import json
API_URL = "http://localhost:8000/api/v1"
API_KEY = "sk_your_api_key"
def stream_generate(service: str, model: str, prompt: str):
"""Stream a text generation response."""
response = requests.post(
f"{API_URL}/generate/stream",
headers={
"Content-Type": "application/json",
"X-API-Key": API_KEY,
"Accept": "text/event-stream"
},
json={
"service_name": service,
"model_name": model,
"input": [
{"type": "text", "name": "input", "content": prompt}
]
},
stream=True
)
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
yield json.loads(line[6:])
# Usage
for chunk in stream_generate(
service="google-gemini",
model="gemini-2.0-flash",
prompt="Write a short story about a robot learning to paint."
):
if chunk.get("final"):
print(f"\n\n--- Complete ---")
print(f"Tokens: {chunk['usage']['total_tokens']}")
print(f"Cost: ${chunk['cost']['total']:.4f}")
else:
# Print content as it arrives
if chunk.get("name") == "output":
print(chunk.get("content", ""), end="", flush=True)
Streaming Chunk Structure
Each chunk from the SSE stream contains:
{
"type": "text",
"name": "output",
"content": "The robot ",
"done": false,
"index": 0,
"is_final": false
}
| Field | Description |
|---|---|
type |
Content type: text, image, metadata |
name |
Output name: output, thinking, metadata |
content |
Incremental content to append |
done |
This specific item is complete |
index |
Position in output array |
is_final |
True for the very last chunk |
Final Chunk
The last chunk includes usage and cost:
{
"final": true,
"usage": {
"input_tokens": 24,
"output_tokens": 512,
"total_tokens": 536
},
"cost": {
"input": 0.0000018,
"output": 0.0001536,
"total": 0.0001554
},
"warnings": []
}
Complete Streaming Client
class StreamingClient:
"""Client with streaming support."""
def __init__(self, api_key: str, base_url: str = "http://localhost:8000/api/v1"):
self.api_key = api_key
self.base_url = base_url
def stream(
self,
service: str,
model: str,
inputs: list,
parameters: dict = None,
history: list = None,
on_chunk: callable = None,
on_thinking: callable = None,
on_complete: callable = None
):
"""
Stream a generation with callbacks.
Args:
service: Service name (e.g., "google-gemini")
model: Model name (e.g., "gemini-2.0-flash")
inputs: List of input items
parameters: Generation parameters
history: Conversation history
on_chunk: Callback for each output chunk
on_thinking: Callback for thinking/reasoning chunks
on_complete: Callback when generation completes
"""
payload = {
"service_name": service,
"model_name": model,
"input": inputs
}
if parameters:
payload["parameters"] = parameters
if history:
payload["history"] = history
response = requests.post(
f"{self.base_url}/generate/stream",
headers={
"Content-Type": "application/json",
"X-API-Key": self.api_key,
"Accept": "text/event-stream"
},
json=payload,
stream=True
)
output_buffer = {}
final_result = None
for line in response.iter_lines():
if not line:
continue
line = line.decode("utf-8")
if not line.startswith("data: "):
continue
chunk = json.loads(line[6:])
if chunk.get("final"):
final_result = chunk
if on_complete:
on_complete(chunk)
break
# Accumulate content by index
index = chunk.get("index", 0)
if index not in output_buffer:
output_buffer[index] = {
"type": chunk["type"],
"name": chunk["name"],
"content": ""
}
output_buffer[index]["content"] += chunk.get("content", "")
# Call appropriate callback
if chunk.get("name") == "thinking" and on_thinking:
on_thinking(chunk.get("content", ""))
elif chunk.get("name") == "output" and on_chunk:
on_chunk(chunk.get("content", ""))
return {
"output": list(output_buffer.values()),
"usage": final_result.get("usage") if final_result else None,
"cost": final_result.get("cost") if final_result else None,
"warnings": final_result.get("warnings", [])
}
# Usage with callbacks
client = StreamingClient(api_key="sk_your_api_key")
def handle_chunk(text):
print(text, end="", flush=True)
def handle_thinking(text):
print(f"[thinking] {text}", end="", flush=True)
def handle_complete(result):
print(f"\n\nComplete! Cost: ${result['cost']['total']:.4f}")
result = client.stream(
service="google-gemini",
model="gemini-2.0-flash",
inputs=[{"type": "text", "name": "input", "content": "Explain recursion."}],
on_chunk=handle_chunk,
on_complete=handle_complete
)
Streaming with Extended Thinking
Some models support "thinking" or reasoning that's streamed separately:
result = client.stream(
service="google-gemini",
model="gemini-3-pro-thinking-preview",
inputs=[{"type": "text", "name": "input", "content": "Solve this math problem: ..."}],
parameters={"enable_thinking": True},
on_thinking=lambda t: print(f"🤔 {t}", end="", flush=True),
on_chunk=lambda c: print(f"📝 {c}", end="", flush=True)
)
Async Streaming (aiohttp)
For async applications:
import aiohttp
import asyncio
async def async_stream(service: str, model: str, prompt: str):
"""Async streaming with aiohttp."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{API_URL}/generate/stream",
headers={
"Content-Type": "application/json",
"X-API-Key": API_KEY,
"Accept": "text/event-stream"
},
json={
"service_name": service,
"model_name": model,
"input": [{"type": "text", "name": "input", "content": prompt}]
}
) as response:
async for line in response.content:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
chunk = json.loads(line[6:])
yield chunk
# Usage
async def main():
full_text = ""
async for chunk in async_stream(
"google-gemini",
"gemini-2.0-flash",
"Write a haiku about programming."
):
if chunk.get("final"):
print(f"\n\nCost: ${chunk['cost']['total']:.4f}")
elif chunk.get("name") == "output":
content = chunk.get("content", "")
full_text += content
print(content, end="", flush=True)
asyncio.run(main())
Handling Stream Interruptions
def robust_stream(service: str, model: str, prompt: str, max_retries: int = 3):
"""Stream with retry logic for interruptions."""
accumulated = ""
retries = 0
while retries < max_retries:
try:
response = requests.post(
f"{API_URL}/generate/stream",
headers={
"Content-Type": "application/json",
"X-API-Key": API_KEY,
"Accept": "text/event-stream"
},
json={
"service_name": service,
"model_name": model,
"input": [{"type": "text", "name": "input", "content": prompt}]
},
stream=True,
timeout=60
)
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
chunk = json.loads(line[6:])
if chunk.get("final"):
return {
"content": accumulated,
"usage": chunk["usage"],
"cost": chunk["cost"]
}
elif chunk.get("name") == "output":
content = chunk.get("content", "")
accumulated += content
yield content
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
retries += 1
if retries >= max_retries:
raise
print(f"\nConnection interrupted, retrying ({retries}/{max_retries})...")
continue
return {"content": accumulated, "partial": True}
Stream to File
def stream_to_file(service: str, model: str, prompt: str, output_path: str):
"""Stream response directly to a file."""
with open(output_path, "w") as f:
for chunk in stream_generate(service, model, prompt):
if chunk.get("final"):
f.write(f"\n\n---\nTokens: {chunk['usage']['total_tokens']}\n")
f.write(f"Cost: ${chunk['cost']['total']:.4f}\n")
elif chunk.get("name") == "output":
content = chunk.get("content", "")
f.write(content)
f.flush() # Ensure immediate write
stream_to_file(
"google-gemini",
"gemini-2.0-flash",
"Write a detailed guide to machine learning.",
"ml_guide.md"
)
Notes
- Streaming is not supported for image generation (gemini_image models)
- For image models, use the regular
/generateendpoint - Stream responses may include rate limit warnings in the final chunk
- Always handle the
finalchunk to get accurate usage/cost data
Next Steps
- Troubleshooting common issues