Streaming¶
LLM-Rosetta supports converting streaming chunks between providers. A stateful StreamContext tracks session metadata, tool calls, and deferred events across the chunk sequence.
Stream Events¶
Streaming produces a sequence of IRStreamEvent types:
| Event | Description |
|---|---|
StreamStartEvent |
Stream has started |
ContentBlockStartEvent |
A new content block begins |
TextDeltaEvent |
Incremental text content |
ReasoningDeltaEvent |
Incremental reasoning/thinking content |
ToolCallStartEvent |
Tool call begins (name + ID) |
ToolCallDeltaEvent |
Incremental tool call arguments |
ContentBlockEndEvent |
Current content block ends |
FinishEvent |
Model finished generating (stop reason) |
UsageEvent |
Token usage statistics |
StreamEndEvent |
Stream has finished |
Converting Stream Chunks¶
Use stream_response_from_provider() to convert provider-native chunks into IR events:
from llm_rosetta import OpenAIChatConverter
from llm_rosetta.converters.base import StreamContext
converter = OpenAIChatConverter()
ctx = StreamContext()
for chunk in provider_stream:
ir_events = converter.stream_response_from_provider(
chunk.model_dump(), context=ctx
)
for event in ir_events:
if event["type"] == "text_delta":
print(event["text"], end="")
Use stream_response_to_provider() to convert IR events back into a target provider's format:
from llm_rosetta import AnthropicConverter
from llm_rosetta.converters.base import StreamContext
target = AnthropicConverter()
target_ctx = StreamContext()
for ir_event in ir_events:
provider_chunk = target.stream_response_to_provider(ir_event, context=target_ctx)
# provider_chunk is a dict (or list of dicts) in the target format
StreamContext¶
StreamContext is a dataclass that extends ConversionContext, adding session-level state for stateful stream transformations.
from llm_rosetta.converters.base import StreamContext
# Create directly
ctx = StreamContext()
# Or via factory (equivalent)
from llm_rosetta import BaseConverter
ctx = BaseConverter.create_stream_context()
Inheritance¶
ConversionContext # warnings, options, metadata
└── StreamContext # + session metadata, tool tracking, lifecycle
└── OpenAIResponsesStreamContext # + sequence_number, item tracking
Since StreamContext IS-A ConversionContext, it carries the same warnings, options, and metadata fields. You can pass metadata_mode="preserve" for lossless round-trip:
Session Metadata¶
The converter populates these fields from the first provider chunk:
| Field | Type | Description |
|---|---|---|
response_id |
str |
Provider response ID (e.g., chatcmpl-xxx) |
model |
str |
Model name from the response |
created |
int |
Unix timestamp |
current_block_index |
int |
Current 0-based content block index |
Lifecycle¶
ctx.mark_started() # Called by StreamStartEvent handler
ctx.mark_ended() # Called by StreamEndEvent handler
ctx.is_started # bool — has the stream begun?
ctx.is_ended # bool — has the stream finished?
Lifecycle guards prevent duplicate events — for example, content_block_end is only emitted if a block is actually open.
Tool Call Tracking¶
During streaming, tool call arguments arrive incrementally. StreamContext accumulates them:
# Typically called by the converter automatically:
ctx.register_tool_call("call_abc", "get_weather")
ctx.append_tool_call_args("call_abc", '{"city":')
ctx.append_tool_call_args("call_abc", '"NYC"}')
# Query accumulated state:
ctx.get_tool_name("call_abc") # "get_weather"
ctx.get_tool_call_args("call_abc") # '{"city":"NYC"}'
# Get all registered tool calls in order:
for call_id, name, args in ctx.get_pending_tool_calls():
print(f"{name}({args})")
For OpenAI Responses, tool call item IDs are also tracked:
ctx.register_tool_call_item("call_abc", "item_xyz")
ctx.get_tool_call_item_id("call_abc") # "item_xyz"
Deferred Event Buffering¶
Some providers send usage and finish information in separate chunks, or combine text and finish in a single frame. To prevent duplicate terminal events and event inflation, StreamContext provides buffer methods:
# Buffer usage for later merging into a finish event
ctx.buffer_usage({"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15})
usage = ctx.pop_pending_usage() # returns dict and clears buffer
# Buffer a finish event for later emission
ctx.buffer_finish({"stop_reason": "end_turn"})
finish = ctx.pop_pending_finish() # returns dict and clears buffer
This pattern is used internally by converters to merge usage into finish events, avoiding separate UsageEvent + FinishEvent pairs that would inflate the output stream during cross-provider conversion.
Cross-Provider Streaming¶
A complete example converting OpenAI Chat SSE → IR → Anthropic SSE:
from llm_rosetta import OpenAIChatConverter, AnthropicConverter
from llm_rosetta.converters.base import StreamContext
source = OpenAIChatConverter()
target = AnthropicConverter()
from_ctx = StreamContext()
to_ctx = StreamContext()
for chunk in openai_stream:
# Provider A → IR
ir_events = source.stream_response_from_provider(
chunk.model_dump(), context=from_ctx
)
# IR → Provider B
for event in ir_events:
result = target.stream_response_to_provider(event, context=to_ctx)
if result:
yield result # SSE chunk in Anthropic format
The base stream_response_to_provider() uses a class-level dispatch table (_TO_P_DISPATCH) to route each IR event type to its handler method. Provider converters customize output through a _post_process_to_provider() hook — for example, OpenAI Chat injects id, object, model, and created envelope fields into every chunk.
Provider-Specific StreamContext¶
The OpenAI Responses API requires additional per-event state (sequence numbers, output item tracking). OpenAIResponsesStreamContext extends StreamContext with these fields.
When a base StreamContext is passed to OpenAIResponsesConverter.stream_response_to_provider(), it is automatically upgraded via OpenAIResponsesStreamContext.from_base():
from llm_rosetta import OpenAIResponsesConverter
from llm_rosetta.converters.base import StreamContext
converter = OpenAIResponsesConverter()
ctx = StreamContext() # base context is fine
# Internally upgraded to OpenAIResponsesStreamContext on first call
result = converter.stream_response_to_provider(event, context=ctx)