Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/core/src/tracing/openai/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { captureException } from '../../exports';
import { SPAN_STATUS_ERROR } from '../../tracing';
import type { Span } from '../../types-hoist/span';
import { updateSpanName } from '../../utils/spanUtils';
import {
GEN_AI_REQUEST_MODEL_ATTRIBUTE,
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE,
OPENAI_OPERATIONS,
} from '../ai/gen-ai-attributes';
import { RESPONSE_EVENT_TYPES } from './constants';
import type {
Expand All @@ -26,6 +29,8 @@ import {
* State object used to accumulate information from a stream of OpenAI events/chunks.
*/
interface StreamingState {
/** Whether this stream contained Responses API events. */
sawResponsesApiEvent: boolean;
/** Types of events encountered in the stream. */
eventTypes: string[];
/** Collected response text fragments (for output recording). */
Expand Down Expand Up @@ -222,6 +227,7 @@ export async function* instrumentStream<T>(
recordOutputs: boolean,
): AsyncGenerator<T, void, unknown> {
const state: StreamingState = {
sawResponsesApiEvent: false,
eventTypes: [],
responseTexts: [],
finishReasons: [],
Expand All @@ -240,12 +246,19 @@ export async function* instrumentStream<T>(
if (isChatCompletionChunk(event)) {
processChatCompletionChunk(event as ChatCompletionChunk, state, recordOutputs);
} else if (isResponsesApiStreamEvent(event)) {
state.sawResponsesApiEvent = true;
processResponsesApiEvent(event as ResponseStreamingEvent, state, recordOutputs, span);
}
yield event;
}
} finally {
setCommonResponseAttributes(span, state.responseId, state.responseModel, state.responseTimestamp);
if (state.sawResponsesApiEvent && state.responseModel) {
span.setAttributes({
[GEN_AI_REQUEST_MODEL_ATTRIBUTE]: state.responseModel,
});
updateSpanName(span, `${OPENAI_OPERATIONS.CHAT} ${state.responseModel}`);
}
setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens);

span.setAttributes({
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/tracing/openai/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Span } from '../../types-hoist/span';
import { updateSpanName } from '../../utils/spanUtils';
import {
GEN_AI_CONVERSATION_ID_ATTRIBUTE,
GEN_AI_REQUEST_DIMENSIONS_ATTRIBUTE,
Expand Down Expand Up @@ -201,6 +202,12 @@ export function addChatCompletionAttributes(
*/
export function addResponsesApiAttributes(span: Span, response: OpenAIResponseObject, recordOutputs?: boolean): void {
setCommonResponseAttributes(span, response.id, response.model, response.created_at);
if (typeof response.model === 'string' && response.model.length > 0) {
span.setAttributes({
[GEN_AI_REQUEST_MODEL_ATTRIBUTE]: response.model,
});
updateSpanName(span, `${OPENAI_OPERATIONS.CHAT} ${response.model}`);
}
if (response.status) {
span.setAttributes({
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify([response.status]),
Expand Down
76 changes: 76 additions & 0 deletions packages/core/test/lib/utils/openai-streaming.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { describe, expect, it, vi } from 'vitest';
import { instrumentStream } from '../../../src/tracing/openai/streaming';
import type { ChatCompletionChunk, ResponseStreamingEvent } from '../../../src/tracing/openai/types';

async function collectStream<T>(stream: AsyncIterable<T>): Promise<T[]> {
const events: T[] = [];
for await (const event of stream) {
events.push(event);
}
return events;
}

describe('openai-streaming', () => {
it('should backfill the request model and span name for streamed Responses API events', async () => {
async function* createStream(): AsyncGenerator<ResponseStreamingEvent> {
yield {
type: 'response.completed',
response: {
object: 'response',
id: 'resp_123',
model: 'gpt-4.1-mini',
created_at: 1704067200,
status: 'completed',
},
} as ResponseStreamingEvent;
}

const span = {
setAttributes: vi.fn(),
setStatus: vi.fn(),
updateName: vi.fn(),
end: vi.fn(),
};

const events = await collectStream(
instrumentStream(createStream(), span as unknown as Parameters<typeof instrumentStream>[1], false),
);

expect(events).toHaveLength(1);
expect(span.setAttributes).toHaveBeenCalledWith({
'gen_ai.request.model': 'gpt-4.1-mini',
});
expect(span.updateName).toHaveBeenCalledWith('chat gpt-4.1-mini');
expect(span.end).toHaveBeenCalled();
});

it('should not backfill the request model or rename the span for chat completion streams', async () => {
async function* createStream(): AsyncGenerator<ChatCompletionChunk> {
yield {
object: 'chat.completion.chunk',
id: 'chatcmpl_123',
created: 1704067200,
model: 'gpt-4o-2024-08-06',
choices: [],
} as ChatCompletionChunk;
}

const span = {
setAttributes: vi.fn(),
setStatus: vi.fn(),
updateName: vi.fn(),
end: vi.fn(),
};

const events = await collectStream(
instrumentStream(createStream(), span as unknown as Parameters<typeof instrumentStream>[1], false),
);

expect(events).toHaveLength(1);
expect(span.setAttributes).not.toHaveBeenCalledWith({
'gen_ai.request.model': 'gpt-4o-2024-08-06',
});
expect(span.updateName).not.toHaveBeenCalledWith('chat gpt-4o-2024-08-06');
expect(span.end).toHaveBeenCalled();
});
});
56 changes: 55 additions & 1 deletion packages/core/test/lib/utils/openai-utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { describe, expect, it } from 'vitest';
import { describe, expect, it, vi } from 'vitest';
import {
addResponsesApiAttributes,
buildMethodPath,
extractRequestParameters,
getOperationName,
getSpanOperation,
isChatCompletionChunk,
Expand All @@ -10,6 +12,7 @@ import {
isResponsesApiStreamEvent,
shouldInstrument,
} from '../../../src/tracing/openai/utils';
import type { OpenAIResponseObject } from '../../../src/tracing/openai/types';

describe('openai-utils', () => {
describe('getOperationName', () => {
Expand Down Expand Up @@ -67,6 +70,31 @@ describe('openai-utils', () => {
});
});

describe('extractRequestParameters', () => {
it('should include the request model when it is explicitly provided', () => {
expect(
extractRequestParameters({
model: 'gpt-4.1-mini',
temperature: 0.2,
}),
).toEqual({
'gen_ai.request.model': 'gpt-4.1-mini',
'gen_ai.request.temperature': 0.2,
});
});

it('should default the request model to unknown when it is not provided', () => {
expect(
extractRequestParameters({
temperature: 0.2,
}),
).toEqual({
'gen_ai.request.model': 'unknown',
'gen_ai.request.temperature': 0.2,
});
});
});

describe('isChatCompletionResponse', () => {
it('should return true for valid chat completion responses', () => {
const validResponse = {
Expand Down Expand Up @@ -185,4 +213,30 @@ describe('openai-utils', () => {
expect(isConversationResponse({ object: null })).toBe(false);
});
});

describe('addResponsesApiAttributes', () => {
it('should backfill the request model and span name from the response model', () => {
const span = {
setAttributes: vi.fn(),
updateName: vi.fn(),
};

addResponsesApiAttributes(
span as unknown as Parameters<typeof addResponsesApiAttributes>[0],
{
object: 'response',
id: 'resp_123',
model: 'gpt-4.1-mini',
created_at: 1704067200,
status: 'completed',
} as unknown as OpenAIResponseObject,
false,
);

expect(span.setAttributes).toHaveBeenCalledWith({
'gen_ai.request.model': 'gpt-4.1-mini',
});
expect(span.updateName).toHaveBeenCalledWith('chat gpt-4.1-mini');
});
});
});