跳转至

转换器 API

llm_rosetta.converters.base.converter.BaseConverter

Bases: ABC

转换器基类,定义统一的转换接口(功能域组织) Base class for converters, defines a unified conversion interface (functional domain organization)

新的设计原则: - 按功能域组织:content, tools, messages, configs - 明确的转换层次:content → messages → requests/responses - 组合模式:子类通过类属性指定使用的ops类 - 保持高层接口简洁:只暴露必要的转换方法

New design principles: - Organized by functional domains: content, tools, messages, configs - Clear conversion hierarchy: content → messages → requests/responses - Composition pattern: subclasses specify ops classes via class attributes - Keep high-level interface simple: only expose necessary conversion methods

create_conversion_context classmethod

create_conversion_context(**options: Any) -> ConversionContext

Create a conversion context for non-streaming conversions.

Parameters:

Name Type Description Default
**options Any

Initial options to populate in the context (e.g., output_format="rest").

{}

Returns:

Type Description
ConversionContext

A new ConversionContext instance.

Source code in src/llm_rosetta/converters/base/converter.py
@classmethod
def create_conversion_context(cls, **options: Any) -> ConversionContext:
    """Create a conversion context for non-streaming conversions.

    Args:
        **options: Initial options to populate in the context
            (e.g., ``output_format="rest"``).

    Returns:
        A new ConversionContext instance.
    """
    return ConversionContext(options=dict(options) if options else {})

create_stream_context classmethod

create_stream_context() -> StreamContext

Create a stream context appropriate for this converter.

Subclasses may override to return a provider-specific context subclass with additional state fields.

Returns:

Type Description
StreamContext

A new StreamContext instance.

Source code in src/llm_rosetta/converters/base/converter.py
@classmethod
def create_stream_context(cls) -> StreamContext:
    """Create a stream context appropriate for this converter.

    Subclasses may override to return a provider-specific context
    subclass with additional state fields.

    Returns:
        A new StreamContext instance.
    """
    return StreamContext()

message_from_provider

message_from_provider(provider_message: Any, *, context: ConversionContext | None = None, **kwargs: Any) -> Message | ExtensionItem

将provider消息转换为IR格式(便利方法) Convert provider message to IR format (convenience method)

Parameters:

Name Type Description Default
provider_message Any

Provider格式的消息

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
Message | ExtensionItem

IR格式的消息

Source code in src/llm_rosetta/converters/base/converter.py
def message_from_provider(
    self,
    provider_message: Any,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> Message | ExtensionItem:
    """将provider消息转换为IR格式(便利方法)
    Convert provider message to IR format (convenience method)

    Args:
        provider_message: Provider格式的消息
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        IR格式的消息
    """
    result = self.messages_from_provider(
        [provider_message], context=context, **kwargs
    )
    return result[0] if result else cast(Message, {})

message_to_provider

message_to_provider(message: Message | ExtensionItem, *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[Any, list[str]]

将单个消息转换为provider格式(便利方法) Convert single message to provider format (convenience method)

Parameters:

Name Type Description Default
message Message | ExtensionItem

IR格式的单个消息

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
tuple[Any, list[str]]

Tuple[转换后的消息, 警告信息列表]

Source code in src/llm_rosetta/converters/base/converter.py
def message_to_provider(
    self,
    message: Message | ExtensionItem,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[Any, list[str]]:
    """将单个消息转换为provider格式(便利方法)
    Convert single message to provider format (convenience method)

    Args:
        message: IR格式的单个消息
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        Tuple[转换后的消息, 警告信息列表]
    """
    result, warnings = self.messages_to_provider(
        [message], context=context, **kwargs
    )
    return result[0] if result else None, warnings

messages_from_provider abstractmethod

messages_from_provider(provider_messages: list[Any], *, context: ConversionContext | None = None, **kwargs: Any) -> list[Message | ExtensionItem]

将provider消息转换为IR消息列表 Convert provider messages to IR message list

Parameters:

Name Type Description Default
provider_messages list[Any]

Provider格式的消息列表

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
list[Message | ExtensionItem]

IR格式的消息列表

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def messages_from_provider(
    self,
    provider_messages: list[Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> list[Message | ExtensionItem]:
    """将provider消息转换为IR消息列表
    Convert provider messages to IR message list

    Args:
        provider_messages: Provider格式的消息列表
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        IR格式的消息列表
    """
    pass

messages_to_provider abstractmethod

messages_to_provider(messages: Sequence[Message | ExtensionItem], *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[list[Any], list[str]]

将消息列表转换为provider消息格式 Convert message list to provider message format

这个方法通常会委托给message_ops_class来处理。 This method typically delegates to message_ops_class for processing.

Parameters:

Name Type Description Default
messages Sequence[Message | ExtensionItem]

IR格式的消息列表(可包含扩展项)

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
tuple[list[Any], list[str]]

Tuple[转换后的消息列表, 警告信息列表]

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def messages_to_provider(
    self,
    messages: Sequence[Message | ExtensionItem],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[list[Any], list[str]]:
    """将消息列表转换为provider消息格式
    Convert message list to provider message format

    这个方法通常会委托给message_ops_class来处理。
    This method typically delegates to message_ops_class for processing.

    Args:
        messages: IR格式的消息列表(可包含扩展项)
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        Tuple[转换后的消息列表, 警告信息列表]
    """
    pass

request_from_provider abstractmethod

request_from_provider(provider_request: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRRequest

将provider请求转换为IRRequest Convert provider request to IRRequest

Subclass helper: call self._convert_tools_from_p(tools) to convert provider tool definitions to IR format.

Parameters:

Name Type Description Default
provider_request dict[str, Any]

Provider格式的请求

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
IRRequest

IR格式的请求

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def request_from_provider(
    self,
    provider_request: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRRequest:
    """将provider请求转换为IRRequest
    Convert provider request to IRRequest

    Subclass helper: call ``self._convert_tools_from_p(tools)`` to convert
    provider tool definitions to IR format.

    Args:
        provider_request: Provider格式的请求
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        IR格式的请求
    """
    pass

request_to_provider abstractmethod

request_to_provider(ir_request: IRRequest, *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[dict[str, Any], list[str]]

将IRRequest转换为provider请求参数 Convert IRRequest to provider request parameters

这是最高层的转换方法,会调用各个功能域的ops类来完成转换: - 使用message_ops处理messages字段 - 使用config_ops处理generation、stream等配置字段 - 使用tool_ops处理tools、tool_choice等工具字段

This is the highest-level conversion method that calls ops classes from various functional domains: - Uses message_ops to handle messages field - Uses config_ops to handle generation, stream and other config fields - Uses tool_ops to handle tools, tool_choice and other tool fields

Subclass helper: call self._apply_tool_config(ir_request, result, ctx) to handle the tools / tool_choice / tool_config fields.

Parameters:

Name Type Description Default
ir_request IRRequest

IR格式的完整请求

required
context ConversionContext | None

Optional conversion context for carrying warnings, options, and metadata through the pipeline.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
tuple[dict[str, Any], list[str]]

Tuple[转换后的请求参数, 警告信息列表]

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def request_to_provider(
    self,
    ir_request: IRRequest,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[dict[str, Any], list[str]]:
    """将IRRequest转换为provider请求参数
    Convert IRRequest to provider request parameters

    这是最高层的转换方法,会调用各个功能域的ops类来完成转换:
    - 使用message_ops处理messages字段
    - 使用config_ops处理generation、stream等配置字段
    - 使用tool_ops处理tools、tool_choice等工具字段

    This is the highest-level conversion method that calls ops classes from various functional domains:
    - Uses message_ops to handle messages field
    - Uses config_ops to handle generation, stream and other config fields
    - Uses tool_ops to handle tools, tool_choice and other tool fields

    Subclass helper: call ``self._apply_tool_config(ir_request, result, ctx)``
    to handle the tools / tool_choice / tool_config fields.

    Args:
        ir_request: IR格式的完整请求
        context: Optional conversion context for carrying warnings,
            options, and metadata through the pipeline.
        **kwargs: 额外参数

    Returns:
        Tuple[转换后的请求参数, 警告信息列表]
    """
    pass

response_from_provider abstractmethod

response_from_provider(provider_response: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRResponse

将provider响应转换为IRResponse Convert provider response to IRResponse

Subclass helper: call self._build_ir_usage(p_usage) to convert provider usage to IR format.

Parameters:

Name Type Description Default
provider_response dict[str, Any]

Provider格式的响应

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
IRResponse

IR格式的响应

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def response_from_provider(
    self,
    provider_response: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRResponse:
    """将provider响应转换为IRResponse
    Convert provider response to IRResponse

    Subclass helper: call ``self._build_ir_usage(p_usage)`` to convert
    provider usage to IR format.

    Args:
        provider_response: Provider格式的响应
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        IR格式的响应
    """
    pass

response_to_provider abstractmethod

response_to_provider(ir_response: IRResponse, *, context: ConversionContext | None = None, **kwargs: Any) -> dict[str, Any]

将IRResponse转换为provider响应 Convert IRResponse to provider response

Subclass helper: call self._build_provider_usage(ir_usage) to convert IR usage to provider format.

Parameters:

Name Type Description Default
ir_response IRResponse

IR格式的响应

required
context ConversionContext | None

Optional conversion context.

None
**kwargs Any

额外参数

{}

Returns:

Type Description
dict[str, Any]

Provider格式的响应

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def response_to_provider(
    self,
    ir_response: IRResponse,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """将IRResponse转换为provider响应
    Convert IRResponse to provider response

    Subclass helper: call ``self._build_provider_usage(ir_usage)`` to convert
    IR usage to provider format.

    Args:
        ir_response: IR格式的响应
        context: Optional conversion context.
        **kwargs: 额外参数

    Returns:
        Provider格式的响应
    """
    pass

stream_response_from_provider abstractmethod

stream_response_from_provider(chunk: dict[str, Any], context: StreamContext | None = None) -> list[IRStreamEvent]

Convert a provider-native stream chunk to a list of IR stream events.

A single provider chunk may produce zero or more IR events depending on the provider's SSE protocol. For example, a chunk that carries both a text delta and a finish reason would yield two events.

Parameters:

Name Type Description Default
chunk dict[str, Any]

Provider-native stream chunk (dict or SDK object that will be normalized internally by each concrete converter).

required
context StreamContext | None

Optional stream context for stateful conversions. When provided, converters may emit lifecycle events (StreamStart/End, ContentBlockStart/End) and track cross-chunk state.

None

Returns:

Type Description
list[IRStreamEvent]

List of IR stream events extracted from the chunk.

Source code in src/llm_rosetta/converters/base/converter.py
@abstractmethod
def stream_response_from_provider(
    self,
    chunk: dict[str, Any],
    context: StreamContext | None = None,
) -> list[IRStreamEvent]:
    """Convert a provider-native stream chunk to a list of IR stream events.

    A single provider chunk may produce zero or more IR events depending on
    the provider's SSE protocol.  For example, a chunk that carries both a
    text delta and a finish reason would yield two events.

    Args:
        chunk: Provider-native stream chunk (dict or SDK object that will
            be normalized internally by each concrete converter).
        context: Optional stream context for stateful conversions.
            When provided, converters may emit lifecycle events
            (StreamStart/End, ContentBlockStart/End) and track
            cross-chunk state.

    Returns:
        List of IR stream events extracted from the chunk.
    """
    pass

stream_response_to_provider

stream_response_to_provider(event: IRStreamEvent, context: StreamContext | None = None) -> dict[str, Any] | list[dict[str, Any]]

Convert an IR stream event to provider-native stream chunk(s).

Uses _TO_P_DISPATCH to route each event type to its handler, then applies _post_process_to_provider for any provider-specific decoration of the result.

Subclasses that need pre-dispatch logic (e.g., context upgrades) may override this method, perform their pre-processing, and call super().stream_response_to_provider(event, context).

Parameters:

Name Type Description Default
event IRStreamEvent

IR stream event to convert.

required
context StreamContext | None

Optional stream context for stateful conversions.

None

Returns:

Type Description
dict[str, Any] | list[dict[str, Any]]

A single provider-native stream chunk dict, or a list of chunk

dict[str, Any] | list[dict[str, Any]]

dicts when the event maps to multiple provider-level messages.

Source code in src/llm_rosetta/converters/base/converter.py
def stream_response_to_provider(
    self,
    event: IRStreamEvent,
    context: StreamContext | None = None,
) -> dict[str, Any] | list[dict[str, Any]]:
    """Convert an IR stream event to provider-native stream chunk(s).

    Uses ``_TO_P_DISPATCH`` to route each event type to its handler,
    then applies ``_post_process_to_provider`` for any provider-specific
    decoration of the result.

    Subclasses that need pre-dispatch logic (e.g., context upgrades)
    may override this method, perform their pre-processing, and call
    ``super().stream_response_to_provider(event, context)``.

    Args:
        event: IR stream event to convert.
        context: Optional stream context for stateful conversions.

    Returns:
        A single provider-native stream chunk dict, or a list of chunk
        dicts when the event maps to multiple provider-level messages.
    """
    handler_name = self._TO_P_DISPATCH.get(event.get("type", ""))
    if handler_name is None:
        return {}
    result = getattr(self, handler_name)(event, context)
    return self._post_process_to_provider(result, event, context)

llm_rosetta.converters.openai_chat.converter.OpenAIChatConverter

OpenAIChatConverter()

Bases: BaseConverter

OpenAI Chat Completions API converter.

Implements the 6 explicit conversion interfaces defined by BaseConverter, plus 2 stream methods for SSE chunk-level conversion.

Uses composition of Ops classes for modular, testable conversion logic.

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def __init__(self):
    self.content_ops = self.content_ops_class()
    self.tool_ops = self.tool_ops_class()
    self.message_ops = self.message_ops_class(self.content_ops, self.tool_ops)
    self.config_ops = self.config_ops_class()

messages_from_provider

messages_from_provider(provider_messages: list[Any], *, context: ConversionContext | None = None, **kwargs: Any) -> list[Message | ExtensionItem]

Convert OpenAI Chat messages to IR message list.

Delegates to message_ops.

Parameters:

Name Type Description Default
provider_messages list[Any]

OpenAI Chat messages.

required

Returns:

Type Description
list[Message | ExtensionItem]

IR messages.

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def messages_from_provider(
    self,
    provider_messages: list[Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> list[Message | ExtensionItem]:
    """Convert OpenAI Chat messages to IR message list.

    Delegates to message_ops.

    Args:
        provider_messages: OpenAI Chat messages.

    Returns:
        IR messages.
    """
    return self.message_ops.p_messages_to_ir(provider_messages, **kwargs)

messages_to_provider

messages_to_provider(messages: Sequence[Message | ExtensionItem], *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[list[Any], list[str]]

Convert IR message list to OpenAI Chat message format.

Delegates to message_ops.

Parameters:

Name Type Description Default
messages Sequence[Message | ExtensionItem]

IR messages (may contain ExtensionItems).

required

Returns:

Type Description
tuple[list[Any], list[str]]

Tuple of (converted messages, warnings).

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def messages_to_provider(
    self,
    messages: Sequence[Message | ExtensionItem],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[list[Any], list[str]]:
    """Convert IR message list to OpenAI Chat message format.

    Delegates to message_ops.

    Args:
        messages: IR messages (may contain ExtensionItems).

    Returns:
        Tuple of (converted messages, warnings).
    """
    return self.message_ops.ir_messages_to_p(messages, **kwargs)

request_from_provider

request_from_provider(provider_request: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRRequest

Convert OpenAI Chat Completions request to IRRequest.

Parameters:

Name Type Description Default
provider_request dict[str, Any]

OpenAI request dict (or SDK object).

required

Returns:

Type Description
IRRequest

IR request.

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def request_from_provider(
    self,
    provider_request: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRRequest:
    """Convert OpenAI Chat Completions request to IRRequest.

    Args:
        provider_request: OpenAI request dict (or SDK object).

    Returns:
        IR request.
    """
    provider_request = self._normalize(provider_request)

    ir_request: dict[str, Any] = {
        "model": provider_request.get("model", ""),
        "messages": [],
    }

    # 1. Messages - separate system messages as system_instruction
    messages = provider_request.get("messages", [])
    ir_messages, system_text = self._extract_system_and_messages(messages)
    ir_request["messages"] = ir_messages
    if system_text:
        ir_request["system_instruction"] = system_text

    # 2. Tools
    tools = provider_request.get("tools")
    if tools:
        ir_request["tools"] = self._convert_tools_from_p(tools)

    # 3. Tool choice
    tool_choice = provider_request.get("tool_choice")
    if tool_choice is not None:
        ir_request["tool_choice"] = self.tool_ops.p_tool_choice_to_ir(tool_choice)

    # 4. Tool config (parallel_tool_calls)
    parallel_tool_calls = provider_request.get("parallel_tool_calls")
    if parallel_tool_calls is not None:
        ir_request["tool_config"] = self.tool_ops.p_tool_config_to_ir(
            {"parallel_tool_calls": parallel_tool_calls}
        )

    # 5. Generation config
    gen_config = self.config_ops.p_generation_config_to_ir(provider_request)
    if gen_config:
        ir_request["generation"] = gen_config

    # 6. Response format
    resp_format = provider_request.get("response_format")
    if resp_format:
        ir_request["response_format"] = self.config_ops.p_response_format_to_ir(
            resp_format
        )

    # 7. Reasoning config
    reasoning = self.config_ops.p_reasoning_config_to_ir(provider_request)
    if reasoning:
        ir_request["reasoning"] = reasoning

    # 8. Stream config
    stream = provider_request.get("stream")
    stream_options = provider_request.get("stream_options")
    if stream is not None or stream_options:
        ir_request["stream"] = self.config_ops.p_stream_config_to_ir(
            {"stream": stream, "stream_options": stream_options}
        )

    # 9. Cache config
    cache_fields = {}
    if "prompt_cache_key" in provider_request:
        cache_fields["prompt_cache_key"] = provider_request["prompt_cache_key"]
    if "prompt_cache_retention" in provider_request:
        cache_fields["prompt_cache_retention"] = provider_request[
            "prompt_cache_retention"
        ]
    if cache_fields:
        ir_request["cache"] = self.config_ops.p_cache_config_to_ir(cache_fields)

    return self._validate_ir_request(ir_request)

request_to_provider

request_to_provider(ir_request: IRRequest, *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[dict[str, Any], list[str]]

Convert IRRequest to OpenAI Chat Completions request parameters.

Orchestrates all Ops classes to build the complete provider request.

Parameters:

Name Type Description Default
ir_request IRRequest

IR request.

required

Returns:

Type Description
tuple[dict[str, Any], list[str]]

Tuple of (provider request dict, warnings list).

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def request_to_provider(
    self,
    ir_request: IRRequest,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[dict[str, Any], list[str]]:
    """Convert IRRequest to OpenAI Chat Completions request parameters.

    Orchestrates all Ops classes to build the complete provider request.

    Args:
        ir_request: IR request.

    Returns:
        Tuple of (provider request dict, warnings list).
    """
    ctx = context if context is not None else ConversionContext()
    result: dict[str, Any] = {"model": ir_request["model"]}

    # 1. System instruction → system message
    messages: list[dict[str, Any]] = []
    system_instruction = ir_request.get("system_instruction")
    if system_instruction:
        messages.append({"role": "system", "content": system_instruction})

    # 2. Messages — fix orphaned tool_calls at IR level before conversion.
    # OpenAI Chat API strictly requires every tool_call_id to have a
    # matching role:tool response.  Other providers (Anthropic, Google)
    # are lenient, so cross-format conversions may carry orphaned
    # tool_calls from interrupted sessions.
    ir_messages = fix_orphaned_tool_calls_ir(ir_request.get("messages", []))
    ctx.warnings.extend(strip_orphaned_tool_config(ir_request))
    converted_msgs, msg_warnings = self.message_ops.ir_messages_to_p(ir_messages)
    messages.extend(converted_msgs)
    ctx.warnings.extend(msg_warnings)
    result["messages"] = messages

    # 3-5. Tools + tool_choice + tool_config
    self._apply_tool_config(ir_request, result, ctx)

    # 6. Generation config
    gen_config = ir_request.get("generation")
    if gen_config:
        gen_fields = self.config_ops.ir_generation_config_to_p(gen_config)
        result.update(gen_fields)

    # 7. Response format
    resp_format = ir_request.get("response_format")
    if resp_format:
        rf_fields = self.config_ops.ir_response_format_to_p(resp_format)
        result.update(rf_fields)

    # 8. Stream config
    stream = ir_request.get("stream")
    if stream:
        stream_fields = self.config_ops.ir_stream_config_to_p(stream)
        result.update(stream_fields)

    # 9. Reasoning config
    reasoning = ir_request.get("reasoning")
    if reasoning:
        reasoning_fields = self.config_ops.ir_reasoning_config_to_p(reasoning)
        result.update(reasoning_fields)

    # 10. Cache config
    cache = ir_request.get("cache")
    if cache:
        cache_fields = self.config_ops.ir_cache_config_to_p(cache)
        result.update(cache_fields)

    # 11. Provider extensions (pass-through)
    extensions = ir_request.get("provider_extensions")
    if extensions:
        result.update(extensions)

    return result, ctx.warnings

response_from_provider

response_from_provider(provider_response: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRResponse

Convert OpenAI Chat Completions response to IRResponse.

Parameters:

Name Type Description Default
provider_response dict[str, Any]

OpenAI response dict (or SDK object).

required

Returns:

Type Description
IRResponse

IR response.

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def response_from_provider(
    self,
    provider_response: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRResponse:
    """Convert OpenAI Chat Completions response to IRResponse.

    Args:
        provider_response: OpenAI response dict (or SDK object).

    Returns:
        IR response.
    """
    provider_response = self._normalize(provider_response)

    choices = []
    for p_choice in provider_response.get("choices", []):
        message = self.message_ops._p_message_to_ir(
            p_choice.get("message", p_choice.get("delta", {}))
        )

        finish_reason_val = p_choice.get("finish_reason")

        choice_info: dict[str, Any] = {
            "index": p_choice.get("index", 0),
            "message": message,
            "finish_reason": {
                "reason": OPENAI_CHAT_REASON_FROM_PROVIDER.get(
                    finish_reason_val, "stop"
                )
            },
        }

        if "logprobs" in p_choice:
            choice_info["logprobs"] = p_choice["logprobs"]

        choices.append(choice_info)

    ir_response: dict[str, Any] = {
        "id": provider_response.get("id", ""),
        "object": "response",
        "created": provider_response.get("created", 0),
        "model": provider_response.get("model", ""),
        "choices": choices,
    }

    # Usage
    p_usage = provider_response.get("usage")
    if p_usage:
        ir_response["usage"] = self._build_ir_usage(p_usage)

    if provider_response.get("service_tier") is not None:
        ir_response["service_tier"] = provider_response["service_tier"]

    if provider_response.get("system_fingerprint") is not None:
        ir_response["system_fingerprint"] = provider_response["system_fingerprint"]

    return self._validate_ir_response(ir_response)

response_to_provider

response_to_provider(ir_response: IRResponse, *, context: ConversionContext | None = None, **kwargs: Any) -> dict[str, Any]

Convert IRResponse to OpenAI Chat Completions response.

Parameters:

Name Type Description Default
ir_response IRResponse

IR response.

required

Returns:

Type Description
dict[str, Any]

OpenAI response dict.

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def response_to_provider(
    self,
    ir_response: IRResponse,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Convert IRResponse to OpenAI Chat Completions response.

    Args:
        ir_response: IR response.

    Returns:
        OpenAI response dict.
    """
    provider_response: dict[str, Any] = {
        "id": ir_response.get("id", ""),
        "object": "chat.completion",
        "created": ir_response.get("created", 0),
        "model": ir_response.get("model", ""),
        "choices": [],
    }

    for choice in ir_response.get("choices", []):
        openai_choice = self._build_choice_to_provider(choice)  # ty: ignore[invalid-argument-type]
        if openai_choice is not None:
            provider_response["choices"].append(openai_choice)

    # Usage
    ir_usage = ir_response.get("usage")
    if ir_usage:
        provider_response["usage"] = self._build_provider_usage(ir_usage)  # ty: ignore[invalid-argument-type]

    if "service_tier" in ir_response:
        provider_response["service_tier"] = ir_response["service_tier"]

    if "system_fingerprint" in ir_response:
        provider_response["system_fingerprint"] = ir_response["system_fingerprint"]

    return provider_response

stream_response_from_provider

stream_response_from_provider(chunk: dict[str, Any], context: StreamContext | None = None) -> list[IRStreamEvent]

Convert an OpenAI SSE chunk to IR stream events.

A single chunk may produce multiple events (e.g., text delta + finish).

When a context is provided, lifecycle events (StreamStartEvent, StreamEndEvent) are emitted and cross-chunk state is tracked. Without a context the behaviour is identical to the previous implementation (backward compatible).

Parameters:

Name Type Description Default
chunk dict[str, Any]

OpenAI SSE chunk dict (or SDK object).

required
context StreamContext | None

Optional stream context for stateful conversions.

None

Returns:

Type Description
list[IRStreamEvent]

List of IR stream events extracted from the chunk.

Source code in src/llm_rosetta/converters/openai_chat/converter.py
def stream_response_from_provider(
    self,
    chunk: dict[str, Any],
    context: StreamContext | None = None,
) -> list[IRStreamEvent]:
    """Convert an OpenAI SSE chunk to IR stream events.

    A single chunk may produce multiple events (e.g., text delta + finish).

    When a ``context`` is provided, lifecycle events (``StreamStartEvent``,
    ``StreamEndEvent``) are emitted and cross-chunk state is tracked.
    Without a context the behaviour is identical to the previous
    implementation (backward compatible).

    Args:
        chunk: OpenAI SSE chunk dict (or SDK object).
        context: Optional stream context for stateful conversions.

    Returns:
        List of IR stream events extracted from the chunk.
    """
    chunk = self._normalize(chunk)
    events: list[IRStreamEvent] = []

    if context is not None and not context.is_started:
        self._handle_stream_start_from_p(chunk, context, events)

    choices = chunk.get("choices", [])
    for p_choice in choices:
        self._handle_choice_from_p(p_choice, context, events)

    usage = chunk.get("usage")
    if usage:
        self._handle_usage_from_p(usage, events)

    self._handle_stream_end_from_p(choices, events, usage, context)

    return events

llm_rosetta.converters.openai_responses.converter.OpenAIResponsesConverter

OpenAIResponsesConverter()

Bases: BaseConverter

OpenAI Responses API converter.

Implements the 6 explicit conversion interfaces defined by BaseConverter.

Uses composition of Ops classes for modular, testable conversion logic.

Note: Responses API uses input for request items and output for response items, with a flat item list structure.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def __init__(self):
    self.content_ops = self.content_ops_class()
    self.tool_ops = self.tool_ops_class()
    self.message_ops = self.message_ops_class(self.content_ops, self.tool_ops)
    self.config_ops = self.config_ops_class()

create_stream_context classmethod

create_stream_context() -> OpenAIResponsesStreamContext

Create a stream context with Responses API specific state.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
@classmethod
def create_stream_context(cls) -> OpenAIResponsesStreamContext:
    """Create a stream context with Responses API specific state."""
    return OpenAIResponsesStreamContext()

messages_from_provider

messages_from_provider(provider_messages: list[Any], *, context: ConversionContext | None = None, **kwargs: Any) -> list[Message | ExtensionItem]

Convert OpenAI Responses items to IR message list.

Delegates to message_ops.

Parameters:

Name Type Description Default
provider_messages list[Any]

OpenAI Responses items.

required

Returns:

Type Description
list[Message | ExtensionItem]

IR messages.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def messages_from_provider(
    self,
    provider_messages: list[Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> list[Message | ExtensionItem]:
    """Convert OpenAI Responses items to IR message list.

    Delegates to message_ops.

    Args:
        provider_messages: OpenAI Responses items.

    Returns:
        IR messages.
    """
    return self.message_ops.p_messages_to_ir(provider_messages, **kwargs)

messages_to_provider

messages_to_provider(messages: Sequence[Message | ExtensionItem], *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[list[Any], list[str]]

Convert IR message list to OpenAI Responses input items.

Delegates to message_ops.

Parameters:

Name Type Description Default
messages Sequence[Message | ExtensionItem]

IR messages (may contain ExtensionItems).

required

Returns:

Type Description
tuple[list[Any], list[str]]

Tuple of (converted items, warnings).

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def messages_to_provider(
    self,
    messages: Sequence[Message | ExtensionItem],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[list[Any], list[str]]:
    """Convert IR message list to OpenAI Responses input items.

    Delegates to message_ops.

    Args:
        messages: IR messages (may contain ExtensionItems).

    Returns:
        Tuple of (converted items, warnings).
    """
    return self.message_ops.ir_messages_to_p(messages, **kwargs)

request_from_provider

request_from_provider(provider_request: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRRequest

Convert OpenAI Responses API request to IRRequest.

Parameters:

Name Type Description Default
provider_request dict[str, Any]

OpenAI Responses request dict (or SDK object).

required

Returns:

Type Description
IRRequest

IR request.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def request_from_provider(
    self,
    provider_request: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRRequest:
    """Convert OpenAI Responses API request to IRRequest.

    Args:
        provider_request: OpenAI Responses request dict (or SDK object).

    Returns:
        IR request.
    """
    provider_request = self._normalize(provider_request)

    ir_request: dict[str, Any] = {
        "model": provider_request.get("model", ""),
        "messages": [],
    }

    # 1. Instructions → system_instruction
    instructions = provider_request.get("instructions")
    if instructions:
        ir_request["system_instruction"] = instructions

    # 2. Input items → messages
    input_items = provider_request.get("input", [])
    if isinstance(input_items, str):
        input_items = [
            {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": input_items}],
            }
        ]
    if isinstance(input_items, list):
        ir_messages = self.message_ops.p_messages_to_ir(input_items)
        ir_request["messages"] = ir_messages

    # 3. Tools
    tools = provider_request.get("tools")
    if tools:
        ir_tools = self._convert_tools_from_p(tools)
        if ir_tools:
            ir_request["tools"] = ir_tools

    # 4-5. Tool choice + tool config
    self._convert_tool_config_from_p(provider_request, ir_request)

    # 6. Generation config
    gen_config = self.config_ops.p_generation_config_to_ir(provider_request)
    if gen_config:
        ir_request["generation"] = gen_config

    # 7. Response format (text field in Responses API)
    text_format = provider_request.get("text")
    if text_format:
        ir_request["response_format"] = self.config_ops.p_response_format_to_ir(
            text_format
        )

    # 8. Reasoning config
    reasoning = self.config_ops.p_reasoning_config_to_ir(provider_request)
    if reasoning:
        ir_request["reasoning"] = reasoning

    # 9. Stream config
    stream = provider_request.get("stream")
    stream_options = provider_request.get("stream_options")
    if stream is not None or stream_options:
        ir_request["stream"] = self.config_ops.p_stream_config_to_ir(
            {"stream": stream, "stream_options": stream_options}
        )

    # 10. Cache config
    self._convert_cache_from_p(provider_request, ir_request)

    # 11. Provider extensions (passthrough fields like allowed_tools)
    allowed_tools = provider_request.get("allowed_tools")
    if allowed_tools is not None:
        ir_request.setdefault("provider_extensions", {})["allowed_tools"] = (
            allowed_tools
        )

    # Preserve mode: capture request fields for echo-back in response
    ctx = context if context is not None else ConversionContext()
    if ctx.metadata_mode == "preserve":
        echo = {
            k: v
            for k, v in provider_request.items()
            if k in RESPONSES_PRESERVE_FIELDS
        }
        if echo:
            ctx.store_request_echo(echo)

    return self._validate_ir_request(ir_request)

request_to_provider

request_to_provider(ir_request: IRRequest, *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[dict[str, Any], list[str]]

Convert IRRequest to OpenAI Responses API request parameters.

Orchestrates all Ops classes to build the complete provider request.

Parameters:

Name Type Description Default
ir_request IRRequest

IR request.

required

Returns:

Type Description
tuple[dict[str, Any], list[str]]

Tuple of (provider request dict, warnings list).

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def request_to_provider(
    self,
    ir_request: IRRequest,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[dict[str, Any], list[str]]:
    """Convert IRRequest to OpenAI Responses API request parameters.

    Orchestrates all Ops classes to build the complete provider request.

    Args:
        ir_request: IR request.

    Returns:
        Tuple of (provider request dict, warnings list).
    """
    ctx = context if context is not None else ConversionContext()
    result: dict[str, Any] = {"model": ir_request["model"]}

    # 1. System instruction → instructions field
    system_instruction = ir_request.get("system_instruction")
    if system_instruction:
        result["instructions"] = system_instruction

    # 2. Messages → input items — fix orphaned tool_calls at IR level
    # before conversion.  OpenAI Responses API strictly requires every
    # function_call to have a matching function_call_output.
    ir_messages = fix_orphaned_tool_calls_ir(ir_request.get("messages", []))
    ctx.warnings.extend(strip_orphaned_tool_config(ir_request))
    items, msg_warnings = self.message_ops.ir_messages_to_p(ir_messages)
    ctx.warnings.extend(msg_warnings)
    result["input"] = items

    # 3-5. Tools + tool_choice + tool_config
    self._apply_tool_config(ir_request, result, ctx)

    # 6. Generation config
    gen_config = ir_request.get("generation")
    if gen_config:
        gen_fields = self.config_ops.ir_generation_config_to_p(gen_config)
        result.update(gen_fields)

    # 7. Response format
    resp_format = ir_request.get("response_format")
    if resp_format:
        rf_fields = self.config_ops.ir_response_format_to_p(resp_format)
        result.update(rf_fields)

    # 8. Stream config
    stream = ir_request.get("stream")
    if stream:
        stream_fields = self.config_ops.ir_stream_config_to_p(stream)
        result.update(stream_fields)

    # 9. Reasoning config
    reasoning = ir_request.get("reasoning")
    if reasoning:
        reasoning_fields = self.config_ops.ir_reasoning_config_to_p(reasoning)
        result.update(reasoning_fields)

    # 10. Cache config
    cache = ir_request.get("cache")
    if cache:
        cache_fields = self.config_ops.ir_cache_config_to_p(cache)
        result.update(cache_fields)

    # 11. Provider extensions (pass-through)
    extensions = ir_request.get("provider_extensions")
    if extensions:
        result.update(extensions)

    return result, ctx.warnings

response_from_provider

response_from_provider(provider_response: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRResponse

Convert OpenAI Responses API response to IRResponse.

Parameters:

Name Type Description Default
provider_response dict[str, Any]

OpenAI Responses response dict (or SDK object).

required

Returns:

Type Description
IRResponse

IR response.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def response_from_provider(
    self,
    provider_response: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRResponse:
    """Convert OpenAI Responses API response to IRResponse.

    Args:
        provider_response: OpenAI Responses response dict (or SDK object).

    Returns:
        IR response.
    """
    provider_response = self._normalize(provider_response)

    choices = []
    output_items = provider_response.get("output", [])

    # Determine finish reason from status
    status = provider_response.get("status")
    if status == "incomplete":
        incomplete_details = provider_response.get("incomplete_details", {})
        inc_reason = (
            incomplete_details.get("reason", "")
            if isinstance(incomplete_details, dict)
            else ""
        )
        finish_reason_val = RESPONSES_INCOMPLETE_REASON_TO_IR.get(
            inc_reason, "stop"
        )
    else:
        finish_reason_val = RESPONSES_STATUS_TO_REASON.get(status or "", "stop")

    # Convert output items to IR message content
    ir_items = self.message_ops.p_messages_to_ir(output_items)

    # Collect all content parts into a single assistant message
    message_content: list[dict[str, Any]] = []
    for ir_item in ir_items:
        if isinstance(ir_item, dict) and "role" in ir_item:
            # It's a message - extract content
            content = ir_item.get("content", [])
            message_content.extend(cast(list, content))
        elif isinstance(ir_item, dict) and "type" in ir_item:
            # It's an extension item (system_event etc.) - skip for choices
            pass

    if message_content:
        choices.append(
            {
                "index": 0,
                "message": {"role": "assistant", "content": message_content},
                "finish_reason": {"reason": finish_reason_val},
            }
        )

    ir_response: dict[str, Any] = {
        "id": provider_response.get("id", ""),
        "object": "response",
        "created": int(provider_response.get("created_at", 0)),
        "model": provider_response.get("model", ""),
        "choices": choices,
    }

    # Handle usage
    p_usage = provider_response.get("usage")
    if p_usage:
        ir_response["usage"] = self._build_ir_usage(p_usage)

    if provider_response.get("service_tier") is not None:
        ir_response["service_tier"] = provider_response["service_tier"]

    # Preserve mode: capture extra fields for lossless round-trip
    ctx = context if context is not None else ConversionContext()
    if ctx.metadata_mode == "preserve":
        self._capture_preserve_metadata(provider_response, ctx)

    return self._validate_ir_response(ir_response)

response_to_provider

response_to_provider(ir_response: IRResponse, *, context: ConversionContext | None = None, **kwargs: Any) -> dict[str, Any]

Convert IRResponse to OpenAI Responses API response.

Parameters:

Name Type Description Default
ir_response IRResponse

IR response.

required

Returns:

Type Description
dict[str, Any]

OpenAI Responses response dict.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def response_to_provider(
    self,
    ir_response: IRResponse,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Convert IRResponse to OpenAI Responses API response.

    Args:
        ir_response: IR response.

    Returns:
        OpenAI Responses response dict.
    """
    provider_response: dict[str, Any] = {
        "id": ir_response.get("id", ""),
        "object": "response",
        "created_at": ir_response.get("created", 0),
        "model": ir_response.get("model", ""),
        "output": [],
        "status": "completed",
    }

    msg_item_id = generate_message_id(ir_response.get("id", ""))

    for choice in ir_response.get("choices", []):
        message = choice.get("message")
        if not message:
            continue

        content_parts = message.get("content", [])
        text_parts: list[dict[str, Any]] = []

        for part in content_parts:
            if is_text_part(part):
                text_parts.append(
                    {
                        "type": "output_text",
                        "text": part["text"],
                        "annotations": [],
                        "logprobs": [],
                    }
                )
            elif is_tool_call_part(part):
                provider_response["output"].append(
                    self.tool_ops.ir_tool_call_to_p(part)
                )
            elif is_reasoning_part(part):
                provider_response["output"].append(
                    self.content_ops.ir_reasoning_to_p(part)
                )

        if text_parts:
            provider_response["output"].insert(
                0,
                {
                    "id": msg_item_id,
                    "type": "message",
                    "role": "assistant",
                    "status": "completed",
                    "content": text_parts,
                },
            )

        # Set finish reason
        finish_reason = choice.get("finish_reason", {}).get("reason", "stop")
        status = RESPONSES_REASON_TO_STATUS.get(finish_reason, "completed")
        provider_response["status"] = status
        incomplete_reason = RESPONSES_REASON_TO_INCOMPLETE_REASON.get(finish_reason)
        if incomplete_reason:
            provider_response["incomplete_details"] = {"reason": incomplete_reason}

    # Usage
    ir_usage = ir_response.get("usage")
    if ir_usage:
        provider_response["usage"] = self._build_provider_usage(ir_usage)  # ty: ignore[invalid-argument-type]

    if "service_tier" in ir_response:
        provider_response["service_tier"] = ir_response["service_tier"]

    # Preserve mode: inject captured extra fields
    ctx = context if context is not None else ConversionContext()
    if ctx.metadata_mode == "preserve":
        self._apply_preserve_metadata(provider_response, ctx)

    return provider_response

stream_response_from_provider

stream_response_from_provider(chunk: dict[str, Any], context: StreamContext | None = None) -> list[IRStreamEvent]

Convert an OpenAI Responses SSE event to IR stream events.

OpenAI Responses API uses fine-grained SSE events with a type field (e.g. response.output_text.delta) instead of the choices[].delta structure used by Chat Completions.

A single event typically produces zero or one IR events, but response.completed may produce both a FinishEvent and a UsageEvent.

When a context is provided, lifecycle events (StreamStartEvent, ContentBlockStartEvent, ContentBlockEndEvent, StreamEndEvent) are emitted and cross-event state is tracked. Without a context the behaviour is identical to the previous implementation (backward compatible).

Parameters:

Name Type Description Default
chunk dict[str, Any]

OpenAI Responses SSE event dict (or SDK object).

required
context StreamContext | None

Optional stream context for stateful conversions.

None

Returns:

Type Description
list[IRStreamEvent]

List of IR stream events extracted from the event.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def stream_response_from_provider(
    self,
    chunk: dict[str, Any],
    context: StreamContext | None = None,
) -> list[IRStreamEvent]:
    """Convert an OpenAI Responses SSE event to IR stream events.

    OpenAI Responses API uses fine-grained SSE events with a ``type`` field
    (e.g. ``response.output_text.delta``) instead of the ``choices[].delta``
    structure used by Chat Completions.

    A single event typically produces zero or one IR events, but
    ``response.completed`` may produce both a ``FinishEvent`` and a
    ``UsageEvent``.

    When a ``context`` is provided, lifecycle events (``StreamStartEvent``,
    ``ContentBlockStartEvent``, ``ContentBlockEndEvent``,
    ``StreamEndEvent``) are emitted and cross-event state is tracked.
    Without a context the behaviour is identical to the previous
    implementation (backward compatible).

    Args:
        chunk: OpenAI Responses SSE event dict (or SDK object).
        context: Optional stream context for stateful conversions.

    Returns:
        List of IR stream events extracted from the event.
    """
    chunk = self._normalize(chunk)
    events: list[IRStreamEvent] = []
    event_type = chunk.get("type", "")

    handler_name = self._FROM_P_DISPATCH.get(event_type)
    if handler_name is not None:
        getattr(self, handler_name)(chunk, context, events)

    # All other event types (response.in_progress,
    # response.output_text.done, etc.) are ignored.

    return events

stream_response_to_provider

stream_response_to_provider(event: IRStreamEvent, context: StreamContext | None = None) -> dict[str, Any] | list[dict[str, Any]]

Convert IR stream event with automatic context upgrade.

If a base StreamContext is passed, it is automatically upgraded to OpenAIResponsesStreamContext (preserving existing state) so that callers do not need to know about the provider-specific subclass.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def stream_response_to_provider(
    self,
    event: IRStreamEvent,
    context: StreamContext | None = None,
) -> dict[str, Any] | list[dict[str, Any]]:
    """Convert IR stream event with automatic context upgrade.

    If a base ``StreamContext`` is passed, it is automatically upgraded
    to ``OpenAIResponsesStreamContext`` (preserving existing state) so
    that callers do not need to know about the provider-specific subclass.
    """
    # Auto-upgrade base StreamContext to the provider-specific subclass.
    # Cache the upgraded instance in metadata so state persists across calls.
    if context is not None and not isinstance(
        context, OpenAIResponsesStreamContext
    ):
        cached = context.metadata.get("_responses_stream_ctx")
        if cached is None:
            cached = OpenAIResponsesStreamContext.from_base(context)
            context.metadata["_responses_stream_ctx"] = cached
        context = cached

    return super().stream_response_to_provider(event, context)

to_provider

to_provider(ir_input, tools=None, tool_choice=None, **kwargs)

Backward-compatible conversion method.

Handles both IRRequest dicts and plain message lists.

Parameters:

Name Type Description Default
ir_input

Either an IRRequest dict or a list of IR messages.

required
tools

Optional tool definitions.

None
tool_choice

Optional tool choice config.

None

Returns:

Type Description

Tuple of (provider request dict, warnings list).

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def to_provider(self, ir_input, tools=None, tool_choice=None, **kwargs):
    """Backward-compatible conversion method.

    Handles both IRRequest dicts and plain message lists.

    Args:
        ir_input: Either an IRRequest dict or a list of IR messages.
        tools: Optional tool definitions.
        tool_choice: Optional tool choice config.

    Returns:
        Tuple of (provider request dict, warnings list).
    """
    # Check if it's an IRRequest (has "messages" key)
    if isinstance(ir_input, dict) and "messages" in ir_input:
        return self.request_to_provider(ir_input, **kwargs)

    # It's a plain message list - wrap in a minimal request
    items, warnings = self.message_ops.ir_messages_to_p(ir_input)
    result: dict[str, Any] = {"input": items}

    if tools:
        result["tools"] = [self.tool_ops.ir_tool_definition_to_p(t) for t in tools]

    if tool_choice:
        result["tool_choice"] = self.tool_ops.ir_tool_choice_to_p(tool_choice)

    return result, warnings

validate_ir_input

validate_ir_input(ir_input)

Validate IR input for backward compatibility.

Parameters:

Name Type Description Default
ir_input

IR input to validate.

required

Returns:

Type Description

List of validation errors, empty if valid.

Source code in src/llm_rosetta/converters/openai_responses/converter.py
def validate_ir_input(self, ir_input):
    """Validate IR input for backward compatibility.

    Args:
        ir_input: IR input to validate.

    Returns:
        List of validation errors, empty if valid.
    """
    return self.message_ops.validate_messages(ir_input)

llm_rosetta.converters.anthropic.converter.AnthropicConverter

AnthropicConverter()

Bases: BaseConverter

Anthropic Messages API converter.

Implements the 6 explicit conversion interfaces defined by BaseConverter, plus 2 stream methods for SSE event-level conversion.

Uses composition of Ops classes for modular, testable conversion logic.

Source code in src/llm_rosetta/converters/anthropic/converter.py
def __init__(self):
    self.content_ops = self.content_ops_class()
    self.tool_ops = self.tool_ops_class()
    self.message_ops = self.message_ops_class(self.content_ops, self.tool_ops)
    self.config_ops = self.config_ops_class()

messages_from_provider

messages_from_provider(provider_messages: list[Any], *, context: ConversionContext | None = None, **kwargs: Any) -> list[Message | ExtensionItem]

Convert Anthropic messages to IR message list.

Delegates to message_ops.

Parameters:

Name Type Description Default
provider_messages list[Any]

Anthropic messages.

required

Returns:

Type Description
list[Message | ExtensionItem]

IR messages.

Source code in src/llm_rosetta/converters/anthropic/converter.py
def messages_from_provider(
    self,
    provider_messages: list[Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> list[Message | ExtensionItem]:
    """Convert Anthropic messages to IR message list.

    Delegates to message_ops.

    Args:
        provider_messages: Anthropic messages.

    Returns:
        IR messages.
    """
    return self.message_ops.p_messages_to_ir(provider_messages, **kwargs)

messages_to_provider

messages_to_provider(messages: Sequence[Message | ExtensionItem], *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[list[Any], list[str]]

Convert IR message list to Anthropic message format.

Delegates to message_ops.

Parameters:

Name Type Description Default
messages Sequence[Message | ExtensionItem]

IR messages (may contain ExtensionItems).

required

Returns:

Type Description
tuple[list[Any], list[str]]

Tuple of (converted messages, warnings).

Source code in src/llm_rosetta/converters/anthropic/converter.py
def messages_to_provider(
    self,
    messages: Sequence[Message | ExtensionItem],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[list[Any], list[str]]:
    """Convert IR message list to Anthropic message format.

    Delegates to message_ops.

    Args:
        messages: IR messages (may contain ExtensionItems).

    Returns:
        Tuple of (converted messages, warnings).
    """
    return self.message_ops.ir_messages_to_p(messages, **kwargs)

request_from_provider

request_from_provider(provider_request: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRRequest

Convert Anthropic Messages API request to IRRequest.

Parameters:

Name Type Description Default
provider_request dict[str, Any]

Anthropic request dict (or SDK object).

required

Returns:

Type Description
IRRequest

IR request.

Source code in src/llm_rosetta/converters/anthropic/converter.py
def request_from_provider(
    self,
    provider_request: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRRequest:
    """Convert Anthropic Messages API request to IRRequest.

    Args:
        provider_request: Anthropic request dict (or SDK object).

    Returns:
        IR request.
    """
    provider_request = self._normalize(provider_request)

    ir_request: dict[str, Any] = {
        "model": provider_request.get("model", ""),
        "messages": [],
    }

    # 1. System instruction
    system_content = provider_request.get("system")
    if system_content:
        if isinstance(system_content, str):
            ir_request["system_instruction"] = system_content
        elif isinstance(system_content, list):
            text_parts = []
            for part in system_content:
                if isinstance(part, dict) and part.get("type") == "text":
                    text_parts.append(part["text"])
            if text_parts:
                ir_request["system_instruction"] = " ".join(text_parts)

    # 2. Messages
    messages = provider_request.get("messages", [])
    ir_messages = self.message_ops.p_messages_to_ir(messages)
    ir_request["messages"] = ir_messages

    # 3. Tools
    tools = provider_request.get("tools")
    if tools:
        ir_request["tools"] = self._convert_tools_from_p(tools)

    # 4. Tool choice
    tool_choice = provider_request.get("tool_choice")
    if tool_choice is not None:
        ir_request["tool_choice"] = self.tool_ops.p_tool_choice_to_ir(tool_choice)

        # Extract tool config from tool_choice
        tc_config = self.tool_ops.p_tool_config_to_ir(tool_choice)
        if tc_config:
            ir_request["tool_config"] = tc_config

    # 5. Generation config
    gen_config = self.config_ops.p_generation_config_to_ir(provider_request)
    if gen_config:
        ir_request["generation"] = gen_config

    # 6. Reasoning config
    reasoning = self.config_ops.p_reasoning_config_to_ir(provider_request)
    if reasoning:
        ir_request["reasoning"] = reasoning

    # 7. Stream config
    stream = provider_request.get("stream")
    if stream is not None:
        ir_request["stream"] = self.config_ops.p_stream_config_to_ir(
            {"stream": stream}
        )

    return self._validate_ir_request(ir_request)

request_to_provider

request_to_provider(ir_request: IRRequest, *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[dict[str, Any], list[str]]

Convert IRRequest to Anthropic Messages API request parameters.

Orchestrates all Ops classes to build the complete provider request.

Parameters:

Name Type Description Default
ir_request IRRequest

IR request.

required

Returns:

Type Description
tuple[dict[str, Any], list[str]]

Tuple of (provider request dict, warnings list).

Source code in src/llm_rosetta/converters/anthropic/converter.py
def request_to_provider(
    self,
    ir_request: IRRequest,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[dict[str, Any], list[str]]:
    """Convert IRRequest to Anthropic Messages API request parameters.

    Orchestrates all Ops classes to build the complete provider request.

    Args:
        ir_request: IR request.

    Returns:
        Tuple of (provider request dict, warnings list).
    """
    ctx = context if context is not None else ConversionContext()
    result: dict[str, Any] = {"model": ir_request["model"]}

    # 1. System instruction → top-level system parameter
    system_instruction = ir_request.get("system_instruction")
    if system_instruction:
        result["system"] = system_instruction

    # 2. Messages — fix orphaned tool_calls/results at IR level before
    #    conversion.  Anthropic strictly requires bidirectional pairing.
    ir_messages = fix_orphaned_tool_calls_ir(ir_request.get("messages", []))
    ctx.warnings.extend(strip_orphaned_tool_config(ir_request))

    # Extract system messages from message list
    for item in ir_messages:
        if isinstance(item, dict) and item.get("role") == "system":
            content = item.get("content", [])
            text_parts = []
            for part in content:
                if is_text_part(part):
                    text_parts.append(part["text"])
            if text_parts and "system" not in result:
                result["system"] = " ".join(text_parts)

    converted_msgs, msg_warnings = self.message_ops.ir_messages_to_p(ir_messages)
    ctx.warnings.extend(msg_warnings)
    result["messages"] = converted_msgs

    # 3. Generation config (must come before tools since max_tokens is required)
    gen_config = ir_request.get("generation")
    if gen_config:
        gen_fields = self.config_ops.ir_generation_config_to_p(gen_config)
        result.update(gen_fields)
    else:
        # Anthropic requires max_tokens
        result["max_tokens"] = 4096

    # 4-6. Tools, tool choice, tool config
    self._apply_tool_config(ir_request, result, ctx)

    # 7. Response format (not supported)
    resp_format = ir_request.get("response_format")
    if resp_format:
        rf_fields = self.config_ops.ir_response_format_to_p(resp_format)
        result.update(rf_fields)

    # 8. Stream config
    stream = ir_request.get("stream")
    if stream:
        stream_fields = self.config_ops.ir_stream_config_to_p(stream)
        result.update(stream_fields)

    # 9. Reasoning config
    reasoning = ir_request.get("reasoning")
    if reasoning:
        reasoning_fields = self.config_ops.ir_reasoning_config_to_p(reasoning)
        result.update(reasoning_fields)

    # 10. Cache config (block-level, warning)
    cache = ir_request.get("cache")
    if cache:
        cache_fields = self.config_ops.ir_cache_config_to_p(cache)
        result.update(cache_fields)

    # 11. Provider extensions (pass-through)
    extensions = ir_request.get("provider_extensions")
    if extensions:
        result.update(extensions)

    return result, ctx.warnings

response_from_provider

response_from_provider(provider_response: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRResponse

Convert Anthropic Messages API response to IRResponse.

Anthropic returns a single message (not choices list). We wrap it as choices[0].

Parameters:

Name Type Description Default
provider_response dict[str, Any]

Anthropic response dict (or SDK object).

required

Returns:

Type Description
IRResponse

IR response.

Source code in src/llm_rosetta/converters/anthropic/converter.py
def response_from_provider(
    self,
    provider_response: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRResponse:
    """Convert Anthropic Messages API response to IRResponse.

    Anthropic returns a single message (not choices list).
    We wrap it as ``choices[0]``.

    Args:
        provider_response: Anthropic response dict (or SDK object).

    Returns:
        IR response.
    """
    provider_response = self._normalize(provider_response)

    # Convert the response message to IR
    ir_message = self.message_ops._p_message_to_ir(provider_response)

    # Map stop_reason to IR finish_reason
    stop_reason_val = provider_response.get("stop_reason")

    finish_reason = (
        ANTHROPIC_REASON_FROM_PROVIDER.get(str(stop_reason_val), "stop")
        if stop_reason_val
        else "stop"
    )
    choice_info: dict[str, Any] = {
        "index": 0,
        "message": ir_message,
        "finish_reason": {"reason": finish_reason},
    }

    if provider_response.get("stop_sequence"):
        choice_info["finish_reason"]["stop_sequence"] = provider_response[
            "stop_sequence"
        ]

    ir_response: dict[str, Any] = {
        "id": provider_response.get("id", ""),
        "object": "response",
        "created": int(time.time()),  # Anthropic doesn't provide timestamp
        "model": provider_response.get("model", ""),
        "choices": [choice_info],
    }

    # Usage (always present — downstream clients may crash without it)
    p_usage = provider_response.get("usage") or {}
    ir_response["usage"] = self._build_ir_usage(p_usage)

    # Preserve mode: capture extra fields for lossless round-trip
    ctx = context if context is not None else ConversionContext()
    if ctx.metadata_mode == "preserve":
        self._capture_preserve_metadata(provider_response, ctx)

    return self._validate_ir_response(ir_response)

response_to_provider

response_to_provider(ir_response: IRResponse, *, context: ConversionContext | None = None, **kwargs: Any) -> dict[str, Any]

Convert IRResponse to Anthropic Messages API response.

Parameters:

Name Type Description Default
ir_response IRResponse

IR response.

required

Returns:

Type Description
dict[str, Any]

Anthropic response dict.

Source code in src/llm_rosetta/converters/anthropic/converter.py
def response_to_provider(
    self,
    ir_response: IRResponse,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Convert IRResponse to Anthropic Messages API response.

    Args:
        ir_response: IR response.

    Returns:
        Anthropic response dict.
    """
    # Anthropic response is a single message
    provider_response: dict[str, Any] = {
        "id": ir_response.get("id", ""),
        "type": "message",
        "model": ir_response.get("model", ""),
        "content": [],
    }

    # Get the first choice (Anthropic only has one)
    choices = ir_response.get("choices", [])
    if choices:
        choice = choices[0]
        message = choice.get("message")
        if message:
            provider_response["role"] = message.get("role", "assistant")

            content_parts = message.get("content", [])
            anthropic_content: list[dict[str, Any]] = []

            for part in content_parts:
                if is_text_part(part):
                    anthropic_content.append(self.content_ops.ir_text_to_p(part))
                elif is_tool_call_part(part):
                    anthropic_content.append(self.tool_ops.ir_tool_call_to_p(part))
                elif is_reasoning_part(part):
                    anthropic_content.append(
                        self.content_ops.ir_reasoning_to_p(part)
                    )

            provider_response["content"] = anthropic_content

        # Map finish_reason back to stop_reason
        finish_reason = choice.get("finish_reason", {})
        reason = finish_reason.get("reason", "stop")
        provider_response["stop_reason"] = ANTHROPIC_REASON_TO_PROVIDER.get(
            reason, "end_turn"
        )

        if "stop_sequence" in finish_reason:
            provider_response["stop_sequence"] = finish_reason["stop_sequence"]

    # Usage (always present — Anthropic responses require usage field)
    ir_usage = ir_response.get("usage") or {}
    provider_response["usage"] = self._build_provider_usage(ir_usage)  # ty: ignore[invalid-argument-type]

    # Preserve mode: inject captured extra fields
    ctx = context if context is not None else ConversionContext()
    if ctx.metadata_mode == "preserve":
        self._apply_preserve_metadata(provider_response, ctx)

    return provider_response

stream_response_from_provider

stream_response_from_provider(chunk: dict[str, Any], context: StreamContext | None = None) -> list[IRStreamEvent]

Convert an Anthropic SSE event to IR stream events.

Parameters:

Name Type Description Default
chunk dict[str, Any]

Anthropic SSE event dict (or SDK object).

required

Returns:

Type Description
list[IRStreamEvent]

List of IR stream events extracted from the event.

Source code in src/llm_rosetta/converters/anthropic/converter.py
def stream_response_from_provider(
    self,
    chunk: dict[str, Any],
    context: StreamContext | None = None,
) -> list[IRStreamEvent]:
    """Convert an Anthropic SSE event to IR stream events.

    Args:
        chunk: Anthropic SSE event dict (or SDK object).

    Returns:
        List of IR stream events extracted from the event.
    """
    chunk = self._normalize(chunk)
    events: list[IRStreamEvent] = []

    event_type = chunk.get("type", "")
    handler_name = self._FROM_P_DISPATCH.get(event_type)
    if handler_name is not None:
        getattr(self, handler_name)(chunk, context, events)

    return events

llm_rosetta.converters.google_genai.converter.GoogleGenAIConverter

GoogleGenAIConverter()

Bases: BaseConverter

Google GenAI API converter.

Implements the 6 explicit conversion interfaces defined by BaseConverter.

Uses composition of Ops classes for modular, testable conversion logic.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def __init__(self):
    self.content_ops = self.content_ops_class()
    self.tool_ops = self.tool_ops_class()
    self.message_ops = self.message_ops_class(self.content_ops, self.tool_ops)
    self.config_ops = self.config_ops_class()

build_config

build_config(tools: Sequence[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None) -> dict[str, Any] | None

Build Google GenAI config parameters (backward compatibility).

Parameters:

Name Type Description Default
tools Sequence[ToolDefinition] | None

Tool definition list.

None
tool_choice ToolChoice | None

Tool choice configuration.

None

Returns:

Type Description
dict[str, Any] | None

Google GenAI config dict, or None if no tool configuration.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def build_config(
    self,
    tools: Sequence[ToolDefinition] | None = None,
    tool_choice: ToolChoice | None = None,
) -> dict[str, Any] | None:
    """Build Google GenAI config parameters (backward compatibility).

    Args:
        tools: Tool definition list.
        tool_choice: Tool choice configuration.

    Returns:
        Google GenAI config dict, or None if no tool configuration.
    """
    config: dict[str, Any] = {}

    if tools:
        config["tools"] = [self.tool_ops.ir_tool_definition_to_p(t) for t in tools]

    if tool_choice:
        tool_config = self.tool_ops.ir_tool_choice_to_p(tool_choice)
        if tool_config:
            config["tool_config"] = tool_config

    return config if config else None

messages_from_provider

messages_from_provider(provider_messages: list[Any], *, context: ConversionContext | None = None, **kwargs: Any) -> list[Message | ExtensionItem]

Convert Google GenAI Content list to IR message list.

Delegates to message_ops.

Parameters:

Name Type Description Default
provider_messages list[Any]

Google Content list.

required

Returns:

Type Description
list[Message | ExtensionItem]

IR messages.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def messages_from_provider(
    self,
    provider_messages: list[Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> list[Message | ExtensionItem]:
    """Convert Google GenAI Content list to IR message list.

    Delegates to message_ops.

    Args:
        provider_messages: Google Content list.

    Returns:
        IR messages.
    """
    return self.message_ops.p_messages_to_ir(provider_messages, **kwargs)

messages_to_provider

messages_to_provider(messages: Sequence[Message | ExtensionItem], *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[list[Any], list[str]]

Convert IR message list to Google GenAI Content format.

Delegates to message_ops.

Parameters:

Name Type Description Default
messages Sequence[Message | ExtensionItem]

IR messages (may contain ExtensionItems).

required

Returns:

Type Description
tuple[list[Any], list[str]]

Tuple of (converted Content list, warnings).

Source code in src/llm_rosetta/converters/google_genai/converter.py
def messages_to_provider(
    self,
    messages: Sequence[Message | ExtensionItem],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[list[Any], list[str]]:
    """Convert IR message list to Google GenAI Content format.

    Delegates to message_ops.

    Args:
        messages: IR messages (may contain ExtensionItems).

    Returns:
        Tuple of (converted Content list, warnings).
    """
    return self.message_ops.ir_messages_to_p(messages, **kwargs)

request_from_provider

request_from_provider(provider_request: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRRequest

Convert Google GenAI request to IRRequest.

Parameters:

Name Type Description Default
provider_request dict[str, Any]

Google request dict (or SDK object).

required

Returns:

Type Description
IRRequest

IR request.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def request_from_provider(
    self,
    provider_request: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRRequest:
    """Convert Google GenAI request to IRRequest.

    Args:
        provider_request: Google request dict (or SDK object).

    Returns:
        IR request.
    """
    provider_request = self._normalize(provider_request)

    ir_request: dict[str, Any] = {
        "model": provider_request.get("model", ""),
        "messages": [],
    }

    # 1. System instruction
    system_instruction = provider_request.get("system_instruction")
    if system_instruction:
        parsed = self._parse_system_instruction(system_instruction)
        if parsed:
            ir_request["system_instruction"] = parsed

    # 2. Messages
    contents = provider_request.get("contents", [])
    ir_messages = self.message_ops.p_messages_to_ir(contents)
    ir_request["messages"] = ir_messages

    # 3. Config fields
    # Support both SDK format (tools/tool_config inside "config" dict)
    # and REST format (tools/tool_config at top level, generation params
    # inside "generationConfig").
    config = provider_request.get("config", {})
    if not isinstance(config, dict):
        config = {}

    # Tools — check SDK config first, then REST top-level
    tools = config.get("tools") or provider_request.get("tools")
    if tools:
        ir_request["tools"] = self._convert_tools_from_p(tools)

    # Tool choice — check SDK/REST snake_case/camelCase
    tool_config = (
        config.get("tool_config")
        or provider_request.get("tool_config")
        or provider_request.get("toolConfig")
    )
    if tool_config:
        ir_request["tool_choice"] = self.tool_ops.p_tool_choice_to_ir(tool_config)

    # Generation config — check SDK config first, then REST generationConfig
    gen_source = config
    rest_gen_config = provider_request.get("generationConfig")
    if rest_gen_config and isinstance(rest_gen_config, dict) and not config:
        gen_source = rest_gen_config
    gen_config = self.config_ops.p_generation_config_to_ir(gen_source)
    if gen_config:
        ir_request["generation"] = gen_config

    # Response format — check both SDK config and REST top-level (snake + camel)
    response_mime_source = None
    if "response_mime_type" in config or "responseMimeType" in config:
        response_mime_source = config
    elif (
        "response_mime_type" in provider_request
        or "responseMimeType" in provider_request
    ):
        response_mime_source = provider_request
    if response_mime_source:
        ir_request["response_format"] = self.config_ops.p_response_format_to_ir(
            response_mime_source
        )

    # Reasoning config (snake + camel)
    if "thinking_config" in config or "thinkingConfig" in config:
        ir_request["reasoning"] = self.config_ops.p_reasoning_config_to_ir(config)

    return self._validate_ir_request(ir_request)

request_to_provider

request_to_provider(ir_request: IRRequest, *, context: ConversionContext | None = None, **kwargs: Any) -> tuple[dict[str, Any], list[str]]

Convert IRRequest to Google GenAI request parameters.

Orchestrates all Ops classes to build the complete provider request.

Parameters:

Name Type Description Default
ir_request IRRequest

IR request.

required
**kwargs Any

Optional keyword arguments.

  • output_format: "sdk" (default) produces a dict with a nested config suitable for the Google GenAI Python SDK. "rest" flattens the config so the result can be sent directly to the Google REST API via httpx / requests.
{}

Returns:

Type Description
tuple[dict[str, Any], list[str]]

Tuple of (provider request dict, warnings list).

Source code in src/llm_rosetta/converters/google_genai/converter.py
def request_to_provider(
    self,
    ir_request: IRRequest,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> tuple[dict[str, Any], list[str]]:
    """Convert IRRequest to Google GenAI request parameters.

    Orchestrates all Ops classes to build the complete provider request.

    Args:
        ir_request: IR request.
        **kwargs: Optional keyword arguments.

            - ``output_format``: ``"sdk"`` (default) produces a dict with
              a nested ``config`` suitable for the Google GenAI Python SDK.
              ``"rest"`` flattens the config so the result can be sent
              directly to the Google REST API via ``httpx`` / ``requests``.

    Returns:
        Tuple of (provider request dict, warnings list).
    """
    ctx = context if context is not None else ConversionContext()
    output_format: str = kwargs.pop(
        "output_format",
        ctx.options.get("output_format", "sdk"),
    )
    result: dict[str, Any] = {"model": ir_request["model"]}

    # 1. Handle system_instruction
    system_instruction = None

    # From IRRequest.system_instruction field
    ir_system = ir_request.get("system_instruction")
    if ir_system:
        system_instruction = {"role": "user", "parts": [{"text": ir_system}]}

    # 2. Handle messages — fix orphaned tool_calls/results and strip
    #    orphaned tool_choice/tool_config at IR level before conversion.
    ir_messages = fix_orphaned_tool_calls_ir(ir_request.get("messages", []))
    ctx.warnings.extend(strip_orphaned_tool_config(ir_request))

    # Extract system messages from message list
    for item in ir_messages:
        if is_message(item) and item.get("role") == "system":
            msg_parts = []
            for part in item.get("content", []):
                if is_text_part(part):
                    msg_parts.append({"text": part["text"]})
            if system_instruction is None:
                system_instruction = {"role": "user", "parts": msg_parts}
            else:
                cast(list, system_instruction["parts"]).extend(msg_parts)

    # Convert non-system messages
    contents, msg_warnings = self.message_ops.ir_messages_to_p(ir_messages)
    ctx.warnings.extend(msg_warnings)
    result["contents"] = contents

    if system_instruction:
        result["system_instruction"] = system_instruction

    # 3. Build config dict (tools written by _apply_tool_config)
    self._apply_tool_config(ir_request, result, ctx)
    config = result.setdefault("config", {})

    # Generation config
    gen_config = ir_request.get("generation")
    if gen_config:
        gen_fields = self.config_ops.ir_generation_config_to_p(gen_config)
        config.update(gen_fields)

    # Response format
    resp_format = ir_request.get("response_format")
    if resp_format:
        rf_fields = self.config_ops.ir_response_format_to_p(resp_format)
        config.update(rf_fields)

    # Reasoning config
    reasoning = ir_request.get("reasoning")
    if reasoning:
        reasoning_fields = self.config_ops.ir_reasoning_config_to_p(reasoning)
        config.update(reasoning_fields)

    # Stream config
    stream = ir_request.get("stream")
    if stream:
        stream_fields = self.config_ops.ir_stream_config_to_p(stream)
        config.update(stream_fields)

    # Cache config
    cache = ir_request.get("cache")
    if cache:
        cache_fields = self.config_ops.ir_cache_config_to_p(cache)
        config.update(cache_fields)

    # Provider extensions
    extensions = ir_request.get("provider_extensions")
    if extensions:
        config.update(extensions)

    if output_format == "rest":
        return self._to_rest_body(result), ctx.warnings

    return result, ctx.warnings

response_from_provider

response_from_provider(provider_response: dict[str, Any], *, context: ConversionContext | None = None, **kwargs: Any) -> IRResponse

Convert Google GenAI response to IRResponse.

Parameters:

Name Type Description Default
provider_response dict[str, Any]

Google response dict (or SDK object).

required

Returns:

Type Description
IRResponse

IR response.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def response_from_provider(
    self,
    provider_response: dict[str, Any],
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> IRResponse:
    """Convert Google GenAI response to IRResponse.

    Args:
        provider_response: Google response dict (or SDK object).

    Returns:
        IR response.
    """
    provider_response = self._normalize(provider_response)

    choices = []
    candidates = provider_response.get("candidates", [])

    for p_candidate in candidates:
        content = p_candidate.get("content")
        message = self.message_ops._p_message_to_ir(content) if content else None
        # Fallback for empty candidates (e.g. thinking consumed all tokens)
        if message is None:
            message = {"role": "assistant", "content": []}

        finish_reason_val = p_candidate.get("finish_reason") or p_candidate.get(
            "finishReason"
        )
        choice_info: dict[str, Any] = {
            "index": p_candidate.get("index", 0),
            "message": message,
            "finish_reason": {
                "reason": GOOGLE_REASON_FROM_PROVIDER.get(finish_reason_val, "stop")
            },
        }
        choices.append(choice_info)

    ir_response: dict[str, Any] = {
        "id": provider_response.get("response_id")
        or provider_response.get("responseId")
        or "",
        "object": "response",
        "created": int(time.time()),  # Google doesn't provide timestamp
        "model": provider_response.get("model_version")
        or provider_response.get("modelVersion")
        or "",
        "choices": choices,
    }

    # Handle usage
    p_usage = provider_response.get("usage_metadata") or provider_response.get(
        "usageMetadata"
    )
    if p_usage:
        ir_response["usage"] = self._build_ir_usage(p_usage)

    return self._validate_ir_response(ir_response)

response_to_provider

response_to_provider(ir_response: IRResponse, *, context: ConversionContext | None = None, **kwargs: Any) -> dict[str, Any]

Convert IRResponse to Google GenAI response.

Parameters:

Name Type Description Default
ir_response IRResponse

IR response.

required

Returns:

Type Description
dict[str, Any]

Google response dict.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def response_to_provider(
    self,
    ir_response: IRResponse,
    *,
    context: ConversionContext | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Convert IRResponse to Google GenAI response.

    Args:
        ir_response: IR response.

    Returns:
        Google response dict.
    """
    provider_response: dict[str, Any] = {
        "responseId": ir_response.get("id", ""),
        "modelVersion": ir_response.get("model", ""),
        "candidates": [],
    }

    for choice in ir_response.get("choices", []):
        message = choice.get("message")
        if not message:
            continue

        # Convert message back to Google Content format
        google_role = "model" if message.get("role") == "assistant" else "user"
        parts: list[dict[str, Any]] = []

        for part in message.get("content", []):
            if is_text_part(part):
                parts.append(self.content_ops.ir_text_to_p(part))
            elif is_tool_call_part(part):
                parts.append(self.tool_ops.ir_tool_call_to_p(part))
            elif is_reasoning_part(part):
                parts.append(self.content_ops.ir_reasoning_to_p(part))

        finish_reason = choice.get("finish_reason", {})
        reason = finish_reason.get("reason", "stop")

        candidate: dict[str, Any] = {
            "index": choice.get("index", 0),
            "content": {"role": google_role, "parts": parts},
            "finishReason": GOOGLE_REASON_TO_PROVIDER.get(reason, "STOP"),
        }
        provider_response["candidates"].append(candidate)

    # Usage
    ir_usage = ir_response.get("usage")
    if ir_usage:
        provider_response["usageMetadata"] = self._build_provider_usage(ir_usage)  # ty: ignore[invalid-argument-type]

    return provider_response

stream_response_from_provider

stream_response_from_provider(chunk: dict[str, Any], context: StreamContext | None = None) -> list[IRStreamEvent]

Convert a Google GenAI stream chunk to IR stream events.

Google GenAI stream chunks are complete GenerateContentResponse objects. Each chunk contains incremental content in candidates[].content.parts[].

When a context is provided, lifecycle events (StreamStartEvent, StreamEndEvent) are emitted and cross-chunk state is tracked.

Parameters:

Name Type Description Default
chunk dict[str, Any]

Google GenAI stream chunk dict (or SDK object).

required
context StreamContext | None

Optional stream context for stateful conversions.

None

Returns:

Type Description
list[IRStreamEvent]

List of IR stream events extracted from the chunk.

Source code in src/llm_rosetta/converters/google_genai/converter.py
def stream_response_from_provider(
    self,
    chunk: dict[str, Any],
    context: StreamContext | None = None,
) -> list[IRStreamEvent]:
    """Convert a Google GenAI stream chunk to IR stream events.

    Google GenAI stream chunks are complete ``GenerateContentResponse``
    objects. Each chunk contains incremental content in
    ``candidates[].content.parts[]``.

    When a ``context`` is provided, lifecycle events (``StreamStartEvent``,
    ``StreamEndEvent``) are emitted and cross-chunk state is tracked.

    Args:
        chunk: Google GenAI stream chunk dict (or SDK object).
        context: Optional stream context for stateful conversions.

    Returns:
        List of IR stream events extracted from the chunk.
    """
    chunk = self._normalize(chunk)
    events: list[IRStreamEvent] = []

    if context is not None and not context.is_started:
        self._handle_stream_start_from_p(chunk, context, events)

    has_finish_reason = False
    deferred_finish: FinishEvent | None = None

    for candidate in chunk.get("candidates", []):
        choice_index = candidate.get("index", 0)
        content = candidate.get("content", {})

        finish_reason = candidate.get("finish_reason") or candidate.get(
            "finishReason"
        )

        # Track how many events existed before processing this
        # candidate's parts, so we can identify which text_delta
        # events belong to this compound chunk.
        pre_parts_len = len(events)

        for part in content.get("parts", []):
            self._handle_part_from_p(part, choice_index, context, events)

        # When a compound chunk has both text and finishReason,
        # defer the text into context so _handle_finish_to_p can
        # merge it into the finish candidate's parts, avoiding
        # an extra output event.
        if finish_reason and context is not None:
            new_events = events[pre_parts_len:]
            deferred_texts: list[str] = []
            kept_new: list[IRStreamEvent] = []
            for ev in new_events:
                if ev["type"] == "text_delta":
                    deferred_texts.append(ev["text"])
                else:
                    kept_new.append(ev)
            if deferred_texts:
                context.pending_text = "".join(deferred_texts)
                events[pre_parts_len:] = kept_new

        if finish_reason:
            has_finish_reason = True
            deferred_finish = FinishEvent(
                type="finish",
                finish_reason={
                    "reason": GOOGLE_REASON_FROM_PROVIDER.get(finish_reason, "stop")  # ty: ignore[invalid-argument-type]
                },
                choice_index=choice_index,
            )

    self._handle_usage_from_p(chunk, events)

    if deferred_finish is not None:
        events.append(deferred_finish)

    if context is not None and has_finish_reason:
        context.mark_ended()
        events.append(StreamEndEvent(type="stream_end"))

    return events

to_provider

to_provider(ir_input: IRInput | IRRequest, tools: Sequence[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, **kwargs: Any) -> tuple[dict[str, Any], list[str]]

Convert IR format to Google GenAI format (backward compatibility).

Supports both IRInput (message list) and IRRequest (full request).

Parameters:

Name Type Description Default
ir_input IRInput | IRRequest

IR input list or request object.

required
tools Sequence[ToolDefinition] | None

Tool definition list.

None
tool_choice ToolChoice | None

Tool choice configuration.

None

Returns:

Type Description
tuple[dict[str, Any], list[str]]

(Google GenAI format dict, warning list)

Source code in src/llm_rosetta/converters/google_genai/converter.py
def to_provider(
    self,
    ir_input: IRInput | IRRequest,
    tools: Sequence[ToolDefinition] | None = None,
    tool_choice: ToolChoice | None = None,
    **kwargs: Any,
) -> tuple[dict[str, Any], list[str]]:
    """Convert IR format to Google GenAI format (backward compatibility).

    Supports both IRInput (message list) and IRRequest (full request).

    Args:
        ir_input: IR input list or request object.
        tools: Tool definition list.
        tool_choice: Tool choice configuration.

    Returns:
        (Google GenAI format dict, warning list)
    """
    if isinstance(ir_input, dict) and "messages" in ir_input:
        # Handle IRRequest
        return self.request_to_provider(cast(IRRequest, ir_input))

    # Handle IRInput (message list)
    ir_input_list: list[Message | ExtensionItem] = list(cast(IRInput, ir_input))
    warnings_list: list[str] = []

    # Extract system messages
    system_instruction, remaining = self.message_ops.extract_system_instruction(
        ir_input_list
    )

    # Convert non-system messages
    contents, msg_warnings = self.message_ops.ir_messages_to_p(remaining)
    warnings_list.extend(msg_warnings)

    # Build result
    result: dict[str, Any] = {"contents": contents}

    if system_instruction:
        result["system_instruction"] = system_instruction

    # Convert tools
    if tools:
        result["tools"] = [self.tool_ops.ir_tool_definition_to_p(t) for t in tools]

    # Convert tool choice
    if tool_choice:
        tool_config = self.tool_ops.ir_tool_choice_to_p(tool_choice)
        if tool_config:
            result["tool_config"] = tool_config

    return result, warnings_list