Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Any, Tuple, List
from json import loads, JSONDecodeError
import re

from oss.src.apis.fastapi.otlp.extractors.base_adapter import BaseAdapter
Expand Down Expand Up @@ -101,8 +102,9 @@
("llm.output_messages", "ag.data.outputs.completion"),
# Embeddings List (e.g., embedding.embeddings.0.vector -> ag.data.embeddings.0.vector)
("embedding.embeddings", "ag.data.inputs.embeddings"),
# LLM Tools List (e.g., llm.tools.0.tool.name -> ag.data.inputs.tools.0.tool.name)
("llm.tools", "ag.data.inputs.tools"),
# NOTE: `llm.tools.{i}.tool.json_schema` is handled separately in
# `_extract_tools`, which parses the JSON string into a structured tool
# object. A plain prefix rename would leave it as `{tool: {json_schema}}`.
# Document Lists for Reranker/Retrieval
("reranker.input_documents", "ag.data.inputs.reranker.input_documents"),
("reranker.output_documents", "ag.data.outputs.reranker.output_documents"),
Expand All @@ -116,16 +118,64 @@
class OpenInferenceAdapter(BaseAdapter):
feature_name = None # Results are merged into the main features dictionary

_TOOL_JSON_SCHEMA_PATTERN = re.compile(r"^llm\.tools\.(\d+)\.tool\.json_schema$")

def __init__(self):
self._exact_map = {otel: ag for otel, ag in OPENINFERENCE_ATTRIBUTES_EXACT}
self._prefix_map = {otel: ag for otel, ag in OPENINFERENCE_ATTRIBUTES_PREFIX}

def _extract_tools(self, span_attributes: Dict[str, Any]) -> Dict[str, Any]:
"""Map OpenInference tool definitions to structured `ag.data` objects.

OpenInference encodes each tool as `llm.tools.{i}.tool.json_schema`, a
JSON string holding the full OpenAI tool object
(`{"type": "function", "function": {...}}`). We parse it and place the
object directly at `ag.data.inputs.tools.{i}` so consumers can read
`tool.type` and `tool.function` without unwrapping a
`{tool: {json_schema: "..."}}` envelope.

The raw `llm.tools.*` attributes stay on the span, so no data is lost.
If the schema cannot be parsed, the raw string is kept under
`ag.data.inputs.tools.{i}.tool.json_schema` to preserve it.
"""
transformed: Dict[str, Any] = {}

for key, value in span_attributes.items():
match = self._TOOL_JSON_SCHEMA_PATTERN.match(key)
if not match:
continue

index = match.group(1)
parsed = None
if isinstance(value, str):
try:
parsed = loads(value)
except (JSONDecodeError, TypeError):
parsed = None

if isinstance(parsed, (dict, list)):
transformed[f"ag.data.inputs.tools.{index}"] = parsed
else:
transformed[f"ag.data.inputs.tools.{index}.tool.json_schema"] = value

return transformed

def process(self, bag: CanonicalAttributes, features: SpanFeatures) -> None:
transformed_attributes: Dict[str, Any] = {}
has_data = False
# node_type is determined from openinference.span.kind and stored in transformed_attributes["ag.type.node"]

# Tools need parsing before the generic mapping (see _extract_tools).
tool_attributes = self._extract_tools(bag.span_attributes)
if tool_attributes:
transformed_attributes.update(tool_attributes)
has_data = True

for key, value in bag.span_attributes.items():
# Tool definitions are handled by _extract_tools above.
if key.startswith("llm.tools."):
continue

# 0. Special handling for openinference.span.kind
if key == "openinference.span.kind":
if isinstance(value, str):
Expand Down
259 changes: 259 additions & 0 deletions api/oss/tests/pytest/unit/otlp/test_openinference_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
"""Unit tests for the OpenInference adapter.

Tests the mapping of OpenInference `llm.*` / `*.value` span attributes to
Agenta's `ag.*` canonical attributes, with a focus on how LLM tool
definitions are parsed.

OpenInference encodes each tool as `llm.tools.{i}.tool.json_schema`, a JSON
string holding the full OpenAI tool object. The adapter parses it into a
structured object at `ag.data.inputs.tools.{i}` so consumers receive
`{type, function}` rather than a `{tool: {json_schema: "..."}}` wrapper.

All tests are self-contained: they construct a ``CanonicalAttributes`` bag,
run the adapter, and assert the resulting ``SpanFeatures``.
"""

from datetime import datetime, timezone
from json import dumps

import pytest

from oss.src.apis.fastapi.otlp.extractors.adapters.openinference_adapter import (
OpenInferenceAdapter,
)
from oss.src.apis.fastapi.otlp.extractors.adapter_registry import AdapterRegistry
from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import (
CanonicalAttributes,
SpanFeatures,
)
from oss.src.core.otel.dtos import OTelSpanKind, OTelStatusCode
from oss.src.core.tracing.utils.attributes import unmarshall_attributes


# ── helpers ──────────────────────────────────────────────────────────


def _make_bag(
span_attributes: dict, span_name: str = "test-span"
) -> CanonicalAttributes:
"""Build a minimal CanonicalAttributes bag for testing."""
return CanonicalAttributes(
span_name=span_name,
trace_id="aaaa" * 8,
span_id="bbbb" * 4,
span_kind=OTelSpanKind.SPAN_KIND_INTERNAL,
start_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
end_time=datetime(2026, 1, 1, 0, 0, 1, tzinfo=timezone.utc),
status_code=OTelStatusCode.STATUS_CODE_OK,
span_attributes=span_attributes,
)


def _openai_tool(name: str = "HTTP_Request1") -> dict:
"""A realistic OpenAI-format tool object (note the empty `properties`)."""
return {
"type": "function",
"function": {
"name": name,
"description": "Makes an HTTP request and returns the response data",
"parameters": {
"type": "object",
"properties": {},
"additionalProperties": False,
"$schema": "http://json-schema.org/draft-07/schema#",
},
"strict": False,
},
}


@pytest.fixture
def adapter() -> OpenInferenceAdapter:
return OpenInferenceAdapter()


# ── Tool Definition Parsing ──────────────────────────────────────────


class TestToolParsing:
def test_single_tool_parsed_into_object(self, adapter):
"""`llm.tools.0.tool.json_schema` becomes a structured object."""
tool = _openai_tool()
bag = _make_bag({"llm.tools.0.tool.json_schema": dumps(tool)})
features = SpanFeatures()
adapter.process(bag, features)

assert features.data["inputs.tools.0"] == tool

def test_parsed_tool_exposes_type_and_function(self, adapter):
"""The frontend reads `tool.type` and `tool.function.name` directly."""
tool = _openai_tool("get_weather")
bag = _make_bag({"llm.tools.0.tool.json_schema": dumps(tool)})
features = SpanFeatures()
adapter.process(bag, features)

parsed = features.data["inputs.tools.0"]
assert parsed["type"] == "function"
assert parsed["function"]["name"] == "get_weather"

def test_does_not_emit_wrapped_json_schema_shape(self, adapter):
"""The old `{tool: {json_schema}}` wrapper must not be produced."""
bag = _make_bag({"llm.tools.0.tool.json_schema": dumps(_openai_tool())})
features = SpanFeatures()
adapter.process(bag, features)

assert "inputs.tools.0.tool.json_schema" not in features.data

def test_multiple_tools_parsed(self, adapter):
first = _openai_tool("tool_one")
second = _openai_tool("tool_two")
bag = _make_bag(
{
"llm.tools.0.tool.json_schema": dumps(first),
"llm.tools.1.tool.json_schema": dumps(second),
}
)
features = SpanFeatures()
adapter.process(bag, features)

assert features.data["inputs.tools.0"] == first
assert features.data["inputs.tools.1"] == second

def test_unparseable_schema_kept_as_raw_string(self, adapter):
"""An unparseable schema is preserved rather than dropped."""
bag = _make_bag({"llm.tools.0.tool.json_schema": "{not valid json"})
features = SpanFeatures()
adapter.process(bag, features)

assert features.data["inputs.tools.0.tool.json_schema"] == "{not valid json"
assert "inputs.tools.0" not in features.data

def test_round_trip_produces_clean_tools_list(self, adapter):
"""End-to-end: parsed tools unmarshall into a clean `tools` list.

This mirrors how the span builder assembles flat `ag.data.*` keys and
how `_parse_span_from_request` unmarshalls them before storage.
"""
tool = _openai_tool()
bag = _make_bag({"llm.tools.0.tool.json_schema": dumps(tool)})
features = SpanFeatures()
adapter.process(bag, features)

flat_attributes = {f"ag.data.{k}": v for k, v in features.data.items()}
nested = unmarshall_attributes(flat_attributes)

tools = nested["ag"]["data"]["inputs"]["tools"]
assert tools == [tool]
# Empty objects such as `properties: {}` must survive the round trip.
assert tools[0]["function"]["parameters"]["properties"] == {}

def test_round_trip_with_two_tools(self, adapter):
"""Two tools unmarshall into a clean two-element list, not wrappers."""
first = _openai_tool("AI_Agent_Tool")
second = _openai_tool("HTTP_Request")
bag = _make_bag(
{
"llm.tools.0.tool.json_schema": dumps(first),
"llm.tools.1.tool.json_schema": dumps(second),
}
)
features = SpanFeatures()
adapter.process(bag, features)

flat_attributes = {f"ag.data.{k}": v for k, v in features.data.items()}
nested = unmarshall_attributes(flat_attributes)

tools = nested["ag"]["data"]["inputs"]["tools"]
assert tools == [first, second]
# The legacy `{tool: {json_schema}}` wrapper must not appear.
assert "tool" not in tools[0]
assert "tool" not in tools[1]


# ── Tool Spans (singular tool.* attributes) ──────────────────────────


class TestSingularToolAttributes:
def test_singular_tool_json_schema_goes_to_meta(self, adapter):
"""A TOOL span's own `tool.json_schema` is unrelated to `llm.tools`."""
bag = _make_bag({"tool.json_schema": dumps(_openai_tool())})
features = SpanFeatures()
adapter.process(bag, features)

assert "tool.json_schema" in features.meta
assert features.data == {}


# ── Existing mappings still work ─────────────────────────────────────


class TestUnaffectedMappings:
def test_input_messages_still_mapped(self, adapter):
bag = _make_bag(
{
"openinference.span.kind": "LLM",
"llm.input_messages.0.message.role": "user",
"llm.input_messages.0.message.content": "hi",
}
)
features = SpanFeatures()
adapter.process(bag, features)

assert features.data["inputs.prompt.0.role"] == "user"
assert features.data["inputs.prompt.0.content"] == "hi"

def test_invocation_parameters_mapped_to_meta_request(self, adapter):
"""`llm.invocation_parameters` is kept verbatim (a JSON string)."""
raw = dumps({"model": "gpt-5.4-nano"})
bag = _make_bag({"llm.invocation_parameters": raw})
features = SpanFeatures()
adapter.process(bag, features)

assert features.meta["request"] == raw


# ── Full Realistic Span ──────────────────────────────────────────────


class TestRealisticSpan:
def test_langchain_chat_span_with_tool(self, adapter):
"""A LangChain ChatOpenAI span carrying one tool and a user prompt."""
tool = _openai_tool()
bag = _make_bag(
{
"openinference.span.kind": "LLM",
"llm.model_name": "gpt-5.4-nano",
"llm.invocation_parameters": dumps(
{"model": "gpt-5.4-nano", "stream": False, "tools": [tool]}
),
"llm.tools.0.tool.json_schema": dumps(tool),
"llm.input_messages.0.message.role": "user",
"llm.input_messages.0.message.content": "Answer in arabic:\n\nhi",
}
)
features = SpanFeatures()
adapter.process(bag, features)

assert features.type["node"] == "chat"
assert features.data["inputs.tools.0"] == tool
assert features.data["inputs.prompt.0.role"] == "user"
assert features.meta["response.model"] == "gpt-5.4-nano"


# ── Integration: AdapterRegistry ─────────────────────────────────────


class TestRegistryIntegration:
def test_openinference_in_registry(self):
"""Verify tool parsing works within the full registry pipeline."""
tool = _openai_tool()
registry = AdapterRegistry()
bag = _make_bag(
{
"openinference.span.kind": "LLM",
"llm.tools.0.tool.json_schema": dumps(tool),
}
)
features = registry.extract_features(bag)

assert features.data["inputs.tools.0"] == tool
Loading