|
| 1 | +import { Span } from "@sentry/core"; |
| 2 | + |
1 | 3 | export type StreamingGuess = { |
2 | 4 | isStreaming: boolean; |
3 | 5 | }; |
@@ -39,3 +41,49 @@ export function classifyResponseStreaming(res: Response): StreamingGuess { |
39 | 41 | // Default: treat as non-streaming |
40 | 42 | return { isStreaming: false }; |
41 | 43 | } |
| 44 | + |
| 45 | +/** |
| 46 | + * Tee a stream, and end the provided span when the stream ends. |
| 47 | + * Returns the other side of the tee, which can be used to send the |
| 48 | + * response to a client. |
| 49 | + */ |
| 50 | +export async function streamResponse(span: Span, res: Response): Promise<Response> { |
| 51 | + const classification = classifyResponseStreaming(res); |
| 52 | + |
| 53 | + // not streaming, just end the span and return the response |
| 54 | + if (!classification.isStreaming || !res.body) { |
| 55 | + span.end(); |
| 56 | + return res; |
| 57 | + } |
| 58 | + |
| 59 | + // Streaming response detected - monitor consumption to keep span alive |
| 60 | + try { |
| 61 | + // Monitor stream consumption and end span when complete |
| 62 | + const [clientStream, monitorStream] = res.body.tee(); |
| 63 | + await (async () => { |
| 64 | + const reader = monitorStream.getReader(); |
| 65 | + try { |
| 66 | + let done = false; |
| 67 | + while (!done) { |
| 68 | + const result = await reader.read(); |
| 69 | + done = result.done; |
| 70 | + } |
| 71 | + } catch { |
| 72 | + // Stream error or cancellation - will end span in finally |
| 73 | + } finally { |
| 74 | + reader.releaseLock(); |
| 75 | + span.end(); |
| 76 | + } |
| 77 | + })(); |
| 78 | + // Return response with client stream |
| 79 | + return new Response(clientStream, { |
| 80 | + status: res.status, |
| 81 | + statusText: res.statusText, |
| 82 | + headers: res.headers, |
| 83 | + }); |
| 84 | + } catch (e) { |
| 85 | + // tee() failed - handle without streaming |
| 86 | + span.end(); |
| 87 | + return res |
| 88 | + } |
| 89 | +} |
0 commit comments