Conversation
📝 WalkthroughWalkthroughThis PR introduces a new ChangesResponses API Handler
Gateway Session Store Configuration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Review rate limit: 4/5 reviews remaining, refill in 12 minutes. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
src/proxy/handlers/responses/types.rs (1)
14-26: ⚡ Quick winAdd a doc comment for the public
ResponsesErrortype.This enum is part of the public surface in this module and should be documented with
///to keep the proxy error contract explicit.Suggested patch
#[derive(Debug, Error)] +/// Error type returned by the `/v1/responses` proxy handler. pub enum ResponsesError {As per coding guidelines, "Use /// for doc comments on public items in Rust".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/proxy/handlers/responses/types.rs` around lines 14 - 26, The public enum ResponsesError lacks a doc comment; add a triple-slash doc comment (///) immediately above the ResponsesError declaration describing its purpose (it represents proxy response errors returned by this module) and briefly mention key variants (AuthorizationError, RateLimitError, GatewayError, Timeout, MissingModelInContext) so downstream users understand the error contract and when each variant is used.src/lib.rs (1)
87-93: 🏗️ Heavy liftMake the session store backend configurable instead of hard-coding in-memory.
Line 92 currently forces a process-local store for all deployments. That can fragment conversation/session state across replicas and makes memory growth depend entirely on runtime traffic patterns. Consider selecting the store via config (with in-memory as a dev default) so production can use a shared/bounded backend.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib.rs` around lines 87 - 93, The code currently hard-codes an in-memory session store when building the gateway (gateway::Gateway::new(...).with_session_store(Arc::new(gateway::session::InMemorySessionStore::default()))), which prevents shared/bounded backends in production; change this to choose the session store implementation from configuration (use in-memory as the default for dev). Add a factory or builder that reads config (env/struct) and returns Arc<dyn SessionStore> (or a concrete boxed type) and replace the direct InMemorySessionStore instantiation with that factory so gateway::with_session_store(...) receives the configured store; ensure the factory supports at least in-memory and one shared backend (e.g., Redis) and preserves Arc wrapping and trait compatibility.src/proxy/handlers/responses/mod.rs (3)
14-21: ⚡ Quick winImport ordering: external crate
tokiois mixed with local imports.Per coding guidelines, imports should be ordered: standard library first, then external crates (alphabetically), then local modules. The
tokioimport on line 20 should be grouped with other external crates.♻️ Suggested reordering
use fastrace::prelude::{Event as TraceEvent, *}; use log::error; +use tokio::sync::{oneshot, oneshot::error::TryRecvError}; + use span_attributes::{ StreamOutputCollector, apply_span_properties, chunk_span_properties, event_starts_output, request_span_properties, response_span_properties, usage_span_properties, }; -use tokio::sync::{oneshot, oneshot::error::TryRecvError}; pub use types::ResponsesError;As per coding guidelines: "Sort imports alphabetically with rustfmt rules: standard library first, then external crates (alphabetical), then local modules".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/proxy/handlers/responses/mod.rs` around lines 14 - 21, The import block mixes external crate `tokio` with local imports; reorder imports so external crates are grouped and alphabetized (standard lib first if present), e.g. place fastrace, log, span_attributes, tokio (alphabetical) together, then local modules like `pub use types::ResponsesError;`; update the block containing the use statements (look for the lines importing fastrace, log, span_attributes, tokio::sync, and ResponsesError) to follow that ordering and grouping.
218-244: ⚡ Quick winDuplicated usage collection logic could be extracted.
The
try_recvhandling forusage_rxin both the error case (lines 218-244) and end-of-stream case (lines 265-291) is nearly identical. Consider extracting this into a helper function to reduce duplication.♻️ Example extraction
async fn try_collect_usage( usage_rx: Option<oneshot::Receiver<Usage>>, request_ctx: &mut RequestContext, span: &Span, ) { let Some(mut usage_rx) = usage_rx else { return }; match usage_rx.try_recv() { Ok(usage) => { if let Err(err) = hooks::rate_limit::post_check_streaming(request_ctx, &usage).await { error!("Rate limit post_check_streaming error: {}", err); } hooks::observability::record_streaming_usage(request_ctx, &usage).await; span.add_properties(|| usage_span_properties(&usage)); } Err(TryRecvError::Empty) => { spawn_stream_usage_observer(request_ctx.clone(), usage_rx); } Err(TryRecvError::Closed) => { error!("Failed to receive streaming usage from gateway: channel closed"); } } }Also applies to: 265-291
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/proxy/handlers/responses/mod.rs` around lines 218 - 244, Extract the duplicated try_recv handling into a helper async function (e.g., try_collect_usage) that accepts Option<oneshot::Receiver<Usage>> (or the same usage_rx type), &mut RequestContext, and &Span; inside it perform the match on usage_rx.try_recv(), call hooks::rate_limit::post_check_streaming, hooks::observability::record_streaming_usage, span.add_properties(usage_span_properties), spawn_stream_usage_observer(request_ctx.clone(), usage_rx) for the Empty case, and log the Closed case; then replace the duplicated blocks in the response handler with calls to this helper (ensure to take() the usage_rx before calling so the ownership/Option semantics remain correct).
150-168: 💤 Low valueThe unfold state tuple has many elements, making it hard to follow.
The 7-element tuple for stream state is unwieldy. While functional, a named struct would improve readability and make the state transitions clearer.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/proxy/handlers/responses/mod.rs` around lines 150 - 168, The 7-tuple used as the state for futures::stream::unfold inside handle_stream_request is hard to read; replace it with a small named struct (e.g., StreamState) that contains fields for stream: ChatResponseStream<ResponsesApiFormat>, span: Span, request_ctx: RequestContext, ended: bool, usage_rx: Option<oneshot::Receiver<Usage>>, collector: StreamOutputCollector, and is_first_chunk: bool; initialize StreamState::default() or a constructor when calling unfold, update the closure to destructure and mutate the named fields instead of tuple indices, and derive/implement Clone/Default or provide explicit construction so the rest of the logic using stream_request_ctx, usage_rx, StreamOutputCollector, and span continues to work without changing behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/proxy/handlers/responses/span_attributes/stream_output.rs`:
- Around line 35-68: The handler currently discards the event's content_index
(patterned with ..) and always updates content.first_mut(); update the match
arms for ResponsesApiStreamEvent::OutputTextDelta and ::OutputTextDone to
capture content_index and use it to target the correct content part: pass
content_index (or use it to index into the message's content Vec) when calling
append_message_text and set_message_text so deltas and completions are applied
to the specified part instead of always the first; adjust
append_message_text/set_message_text callsites/logic to accept or use the
content_index to route updates to the correct
ResponsesOutputItem::Message.content entry.
---
Nitpick comments:
In `@src/lib.rs`:
- Around line 87-93: The code currently hard-codes an in-memory session store
when building the gateway
(gateway::Gateway::new(...).with_session_store(Arc::new(gateway::session::InMemorySessionStore::default()))),
which prevents shared/bounded backends in production; change this to choose the
session store implementation from configuration (use in-memory as the default
for dev). Add a factory or builder that reads config (env/struct) and returns
Arc<dyn SessionStore> (or a concrete boxed type) and replace the direct
InMemorySessionStore instantiation with that factory so
gateway::with_session_store(...) receives the configured store; ensure the
factory supports at least in-memory and one shared backend (e.g., Redis) and
preserves Arc wrapping and trait compatibility.
In `@src/proxy/handlers/responses/mod.rs`:
- Around line 14-21: The import block mixes external crate `tokio` with local
imports; reorder imports so external crates are grouped and alphabetized
(standard lib first if present), e.g. place fastrace, log, span_attributes,
tokio (alphabetical) together, then local modules like `pub use
types::ResponsesError;`; update the block containing the use statements (look
for the lines importing fastrace, log, span_attributes, tokio::sync, and
ResponsesError) to follow that ordering and grouping.
- Around line 218-244: Extract the duplicated try_recv handling into a helper
async function (e.g., try_collect_usage) that accepts
Option<oneshot::Receiver<Usage>> (or the same usage_rx type), &mut
RequestContext, and &Span; inside it perform the match on usage_rx.try_recv(),
call hooks::rate_limit::post_check_streaming,
hooks::observability::record_streaming_usage,
span.add_properties(usage_span_properties),
spawn_stream_usage_observer(request_ctx.clone(), usage_rx) for the Empty case,
and log the Closed case; then replace the duplicated blocks in the response
handler with calls to this helper (ensure to take() the usage_rx before calling
so the ownership/Option semantics remain correct).
- Around line 150-168: The 7-tuple used as the state for futures::stream::unfold
inside handle_stream_request is hard to read; replace it with a small named
struct (e.g., StreamState) that contains fields for stream:
ChatResponseStream<ResponsesApiFormat>, span: Span, request_ctx: RequestContext,
ended: bool, usage_rx: Option<oneshot::Receiver<Usage>>, collector:
StreamOutputCollector, and is_first_chunk: bool; initialize
StreamState::default() or a constructor when calling unfold, update the closure
to destructure and mutate the named fields instead of tuple indices, and
derive/implement Clone/Default or provide explicit construction so the rest of
the logic using stream_request_ctx, usage_rx, StreamOutputCollector, and span
continues to work without changing behavior.
In `@src/proxy/handlers/responses/types.rs`:
- Around line 14-26: The public enum ResponsesError lacks a doc comment; add a
triple-slash doc comment (///) immediately above the ResponsesError declaration
describing its purpose (it represents proxy response errors returned by this
module) and briefly mention key variants (AuthorizationError, RateLimitError,
GatewayError, Timeout, MissingModelInContext) so downstream users understand the
error contract and when each variant is used.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ffaa4ed9-d771-485c-bdc3-2b6cd40fe580
📒 Files selected for processing (32)
src/config/entities/mod.rssrc/gateway/provider_instance.rssrc/gateway/providers/macros.rssrc/gateway/providers/modelscope.rssrc/gateway/providers/moonshot.rssrc/gateway/providers/openrouter.rssrc/gateway/streams/bridged.rssrc/gateway/streams/hub.rssrc/gateway/streams/native.rssrc/gateway/streams/reader/aws_event_stream.rssrc/gateway/streams/reader/sse.rssrc/gateway/traits/chat_format.rssrc/gateway/traits/provider.rssrc/gateway/types/common.rssrc/lib.rssrc/proxy/handlers/messages/types.rssrc/proxy/handlers/mod.rssrc/proxy/handlers/responses/mod.rssrc/proxy/handlers/responses/span_attributes/message_attributes.rssrc/proxy/handlers/responses/span_attributes/mod.rssrc/proxy/handlers/responses/span_attributes/stream_output.rssrc/proxy/handlers/responses/span_attributes/telemetry.rssrc/proxy/handlers/responses/span_attributes/tests.rssrc/proxy/handlers/responses/types.rssrc/proxy/hooks/rate_limit/concurrent/mod.rssrc/proxy/hooks/rate_limit/concurrent/utils.rssrc/proxy/hooks/rate_limit/ratelimit/local.rssrc/proxy/hooks/rate_limit/ratelimit/mod.rssrc/proxy/hooks/rate_limit/ratelimit/utils.rssrc/proxy/mod.rssrc/utils/instance.rstests/proxy/responses.test.ts
| ResponsesApiStreamEvent::OutputTextDelta { | ||
| output_index, | ||
| delta, | ||
| .. | ||
| } => { | ||
| if !delta.is_empty() { | ||
| append_message_text( | ||
| self.items.entry(*output_index).or_insert_with(|| { | ||
| ResponsesOutputItem::Message { | ||
| id: String::new(), | ||
| role: "assistant".into(), | ||
| content: vec![], | ||
| status: "in_progress".into(), | ||
| } | ||
| }), | ||
| delta, | ||
| ); | ||
| } | ||
| } | ||
| ResponsesApiStreamEvent::OutputTextDone { | ||
| output_index, text, .. | ||
| } => { | ||
| set_message_text( | ||
| self.items.entry(*output_index).or_insert_with(|| { | ||
| ResponsesOutputItem::Message { | ||
| id: String::new(), | ||
| role: "assistant".into(), | ||
| content: vec![], | ||
| status: "completed".into(), | ||
| } | ||
| }), | ||
| text, | ||
| ); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== ResponsesApiStreamEvent shape =="
rg -n -C3 'enum\s+ResponsesApiStreamEvent|OutputTextDelta|OutputTextDone|content_index|part_index' src
echo
echo "== Collector handling =="
rg -n -C3 'OutputTextDelta|OutputTextDone|append_message_text|set_message_text|first_mut|content.clear' src/proxy/handlers/responses/span_attributes/stream_output.rsRepository: api7/aisix
Length of output: 16420
Handle multi-part content streams using content_index from events.
Lines 35-39 and 54-55 drop the content_index field via pattern .., while append_message_text (line 136) targets only content.first_mut() and set_message_text (lines 154-157) clears and replaces all parts. Although content_index is currently hardcoded to 0 in all event generation (src/gateway/formats/openai/responses.rs), the event structure supports per-part indices and the handler should respect them. Refactor to route text deltas and completions to the correct content part index rather than always the first.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/proxy/handlers/responses/span_attributes/stream_output.rs` around lines
35 - 68, The handler currently discards the event's content_index (patterned
with ..) and always updates content.first_mut(); update the match arms for
ResponsesApiStreamEvent::OutputTextDelta and ::OutputTextDone to capture
content_index and use it to target the correct content part: pass content_index
(or use it to index into the message's content Vec) when calling
append_message_text and set_message_text so deltas and completions are applied
to the specified part instead of always the first; adjust
append_message_text/set_message_text callsites/logic to accept or use the
content_index to route updates to the correct
ResponsesOutputItem::Message.content entry.
Summary by CodeRabbit
/v1/responsesendpoint supporting OpenAI Responses API with streaming and non-streaming request handlingprevious_response_idto maintain conversation history across requests