feat: responses api server-side state management#87
Conversation
📝 WalkthroughWalkthroughGateway gains optional server-side session persistence: a SessionStore trait and in-memory implementation, wiring to load prior messages into outgoing Responses API chat requests and persist hub responses back into the store, plus tests and helper logic to handle previous_response_id and conversation-id lookups. ChangesServer-Side Session Persistence
Sequence Diagram(s)sequenceDiagram
participant Client
participant Gateway
participant SessionStore
participant Hub
Client->>Gateway: send Responses API chat request (maybe with previous_response_id)
alt previous_response_id or conversation_id present
Gateway->>SessionStore: get_by_response_id / get_by_conversation_id
SessionStore-->>Gateway: StoredSession(s) (messages, metadata)
Gateway->>Gateway: splice stored messages into hub ChatCompletionRequest
end
Gateway->>Hub: forward ChatCompletionRequest
Hub-->>Gateway: Hub ChatCompletionResponse (id, model, created, messages)
alt non-native hub response
Gateway->>SessionStore: put_session(StoredSession derived from request+response)
SessionStore-->>Gateway: ack
end
Gateway-->>Client: aggregated response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error)
✅ Passed checks (5 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Review rate limit: 4/5 reviews remaining, refill in 12 minutes. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/gateway/gateway.rs (1)
89-103:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftPersist streamed Responses API turns before returning the stream.
For
ResponsesApiFormatwithstream: true, this branch returns at Line 94 beforesave_session()at Lines 101-102 can run. Streamed turns therefore never enterSessionStore, so a laterprevious_response_idlookup for a streamed response will fail even though the non-streaming path works.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/gateway/gateway.rs` around lines 89 - 103, For ResponsesApiFormat with stream=true the code returns the BridgedStream before persisting the turn, so streamed turns never get saved; fix by ensuring save_session is invoked for streamed ResponsesApiFormat before returning the ChatResponse::Stream. Concretely, in the stream branch (where call_chat_hub_stream, BridgedStream::new and ChatResponse::Stream are used) detect F::name() == ResponsesApiFormat::name() and call the same persistence logic (reuse save_session or introduce a small helper that accepts the stream/metadata) prior to the return, or else arrange for BridgedStream to invoke save_session on the first response chunk; reference call_chat_hub_stream, BridgedStream, ChatResponse::Stream, save_session and F::name()/ResponsesApiFormat to locate where to add the persistence.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/gateway/session.rs`:
- Around line 38-48: get_by_conversation_id currently sorts only by created_at
so ties leave order nondeterministic; fix by adding a deterministic tie-breaker
or returning the latest session directly. Add a monotonic insertion/order key
(e.g., insertion_index: u64) to StoredSession and increment it when sessions are
inserted (update the session creation/insert path where sessions are added),
then in get_by_conversation_id sort by (created_at, insertion_index) or use
max_by_key((created_at, insertion_index)) to pick latest; alternatively
implement a helper latest_by_conversation that finds the max_by_key((created_at,
insertion_index)) and use that in gateway.rs instead of assuming the last
element. Ensure you update any constructors and places that clone/create
StoredSession to populate the new insertion_index.
---
Outside diff comments:
In `@src/gateway/gateway.rs`:
- Around line 89-103: For ResponsesApiFormat with stream=true the code returns
the BridgedStream before persisting the turn, so streamed turns never get saved;
fix by ensuring save_session is invoked for streamed ResponsesApiFormat before
returning the ChatResponse::Stream. Concretely, in the stream branch (where
call_chat_hub_stream, BridgedStream::new and ChatResponse::Stream are used)
detect F::name() == ResponsesApiFormat::name() and call the same persistence
logic (reuse save_session or introduce a small helper that accepts the
stream/metadata) prior to the return, or else arrange for BridgedStream to
invoke save_session on the first response chunk; reference call_chat_hub_stream,
BridgedStream, ChatResponse::Stream, save_session and
F::name()/ResponsesApiFormat to locate where to add the persistence.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 933d373f-26bf-4634-b09d-ab9daeee4b81
📒 Files selected for processing (3)
src/gateway/gateway.rssrc/gateway/mod.rssrc/gateway/session.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/gateway/gateway.rs (1)
90-104:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftResponses streaming bypasses session persistence.
At Line 90 the stream branch returns early, so
save_session(Line 102+) is never reached for streamed Responses requests. That makes server-side history incomplete and can causeprevious_response_idcontinuity to fail after streamed turns.🧭 Possible short-term safety guard (until stream persistence is implemented)
if stream { + if F::name() == ResponsesApiFormat::name() && self.session_store.is_some() { + return Err(GatewayError::Validation( + "responses streaming with server-side session state is not yet supported" + .into(), + )); + } let hub_stream = self.call_chat_hub_stream(instance, &hub_request).await?; let (usage_tx, usage_rx) = oneshot::channel(); let bridged_stream = BridgedStream::<F>::new(hub_stream, ctx, usage_tx);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/gateway/gateway.rs` around lines 90 - 104, The stream branch currently returns early from call_chat_hub_stream so save_session is never called for streamed Responses; fix by making the bridged stream surface the final hub response and persisting it when the stream completes: change BridgedStream (constructed in call_chat_hub_stream) to accept/emit a completion channel (e.g., oneshot or mpsc) that yields the final hub_response, then in the gateway before returning ChatResponse::Stream spawn a background task that awaits that completion channel and calls self.save_session(&ctx, &hub_request, &hub_response). Keep the existing call_chat_hub path unchanged (it continues to call save_session directly) and ensure the new task only runs when F::name() == ResponsesApiFormat::name().
🧹 Nitpick comments (1)
src/gateway/session.rs (1)
14-38: ⚡ Quick winAdd API docs and make ordering semantics explicit for
SessionStore.
SessionStore::get_by_conversation_idis consumed as an ordered list (latest inferred by taking the last item insrc/gateway/gateway.rs), but that contract is not documented on the public trait. Please add///docs on the public trait/structs and explicitly define expected ordering for returned sessions.📝 Proposed doc contract update
#[allow(dead_code)] #[async_trait] +/// Persists and retrieves chat sessions for server-side conversation state. pub trait SessionStore: Send + Sync + 'static { async fn get_by_response_id(&self, response_id: &str) -> Result<Option<StoredSession>>; + /// Returns all sessions for a conversation in ascending order by recency + /// (oldest first, newest last). async fn get_by_conversation_id(&self, conv_id: &str) -> Result<Vec<StoredSession>>; async fn put_session(&self, session: &StoredSession) -> Result<()>; async fn delete_session(&self, response_id: &str) -> Result<()>; } #[allow(dead_code)] #[derive(Debug, Clone, Default)] +/// Stored conversation snapshot keyed by provider response id. pub struct StoredSession {As per coding guidelines
**/*.rs: Use /// for doc comments on public items in Rust.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/gateway/session.rs` around lines 14 - 38, Add /// doc comments to the public SessionStore trait and its methods (especially get_by_conversation_id), and to the StoredSession and InMemorySessionStore types, explaining the behaviour and ordering contract: get_by_conversation_id must return sessions in chronological order from oldest to newest (so the last element is the latest session as relied on by the gateway), document what StoredSession fields represent (response_id, conversation_id, messages, model, created_at, insertion_index, metadata) and note that insertion_index is monotonic for tie-breaking; update InMemorySessionStore docs to state it preserves that ordering semantics. Keep comments short and use /// doc style.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/gateway/gateway.rs`:
- Around line 90-104: The stream branch currently returns early from
call_chat_hub_stream so save_session is never called for streamed Responses; fix
by making the bridged stream surface the final hub response and persisting it
when the stream completes: change BridgedStream (constructed in
call_chat_hub_stream) to accept/emit a completion channel (e.g., oneshot or
mpsc) that yields the final hub_response, then in the gateway before returning
ChatResponse::Stream spawn a background task that awaits that completion channel
and calls self.save_session(&ctx, &hub_request, &hub_response). Keep the
existing call_chat_hub path unchanged (it continues to call save_session
directly) and ensure the new task only runs when F::name() ==
ResponsesApiFormat::name().
---
Nitpick comments:
In `@src/gateway/session.rs`:
- Around line 14-38: Add /// doc comments to the public SessionStore trait and
its methods (especially get_by_conversation_id), and to the StoredSession and
InMemorySessionStore types, explaining the behaviour and ordering contract:
get_by_conversation_id must return sessions in chronological order from oldest
to newest (so the last element is the latest session as relied on by the
gateway), document what StoredSession fields represent (response_id,
conversation_id, messages, model, created_at, insertion_index, metadata) and
note that insertion_index is monotonic for tie-breaking; update
InMemorySessionStore docs to state it preserves that ordering semantics. Keep
comments short and use /// doc style.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 84bbb68c-a594-43bf-a8b7-68495a86d7d0
📒 Files selected for processing (2)
src/gateway/gateway.rssrc/gateway/session.rs
@coderabbitai Create an issue for this: we should implement a stream accumulator to ensure that messages in streaming responses can also be managed by the server. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain |
Summary by CodeRabbit
New Features
Tests