Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions crates/stringflow-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,84 @@ mod tests {
assert_eq!(events.len(), 1);
assert!(matches!(&events[0], StreamEvent::Delta(s) if s == "hi"));
}

#[test]
fn parse_sse_buffer_empty_data_field() {
// Server sends "data: \n\n" — empty data after prefix.
// After strip_prefix + trim, data is "", which is not valid JSON
// and not "[DONE]", so it should be silently skipped.
let buffer = "data: \n\n";
let (events, remaining) = parse_sse_buffer(buffer, WireFormat::Completions);
assert!(
events.is_empty(),
"empty data field should produce no events"
);
assert!(remaining.is_empty());
}

#[test]
fn parse_sse_buffer_empty_data_among_valid_events() {
// Mix of empty data and valid events — only valid events should appear.
let buffer =
"data: \n\ndata: {\"choices\":[{\"delta\":{\"content\":\"ok\"}}]}\n\ndata: \n\n";
let (events, remaining) = parse_sse_buffer(buffer, WireFormat::Completions);
assert_eq!(events.len(), 1);
assert!(matches!(&events[0], StreamEvent::Delta(s) if s == "ok"));
assert!(remaining.is_empty());
}

#[test]
fn parse_sse_buffer_malformed_json_silently_skipped() {
// Malformed JSON should be silently skipped (parse_stream_chunk returns None).
let buffer = "data: {not valid json\n\n";
let (events, remaining) = parse_sse_buffer(buffer, WireFormat::Completions);
assert!(events.is_empty(), "malformed JSON should produce no events");
assert!(remaining.is_empty());
}

#[test]
fn parse_sse_buffer_malformed_json_does_not_block_subsequent_events() {
// A malformed chunk followed by a valid chunk — the valid one should still parse.
let buffer = "data: {broken\n\ndata: {\"choices\":[{\"delta\":{\"content\":\"after\"}}]}\n\ndata: [DONE]\n\n";
let (events, remaining) = parse_sse_buffer(buffer, WireFormat::Completions);
assert_eq!(events.len(), 2);
assert!(matches!(&events[0], StreamEvent::Delta(s) if s == "after"));
assert!(matches!(&events[1], StreamEvent::Done));
assert!(remaining.is_empty());
}

#[test]
fn retry_delay_increases_with_attempts() {
let d1 = retry_delay(1);
let d2 = retry_delay(2);
let d3 = retry_delay(3);
let d4 = retry_delay(4);

// Delays must be strictly increasing (exponential base dominates).
assert!(d2 > d1, "delay should increase: d2={d2:?} > d1={d1:?}");
assert!(d3 > d2, "delay should increase: d3={d3:?} > d2={d2:?}");
assert!(d4 > d3, "delay should increase: d4={d4:?} > d3={d3:?}");
}

#[test]
fn retry_delay_has_jitter_variation() {
// The jitter term is `base * (attempt % 3) / 4`, so attempts with
// different `attempt % 3` values should produce different jitter offsets
// even at the same exponential tier. Specifically:
// attempt=3 → base=2000, jitter=0 (3%3=0) → 2000ms
// attempt=4 → base=4000, jitter=4000*1/4=1000 → 5000ms
// attempt=6 → base=16000, jitter=0 (6%3=0) → 16000ms
// Verify the formula directly for a few values.
let d1 = retry_delay(1); // base=500, jitter=500*1/4=125 → 625ms
assert_eq!(d1.as_millis(), 625);

let d2 = retry_delay(2); // base=1000, jitter=1000*2/4=500 → 1500ms
assert_eq!(d2.as_millis(), 1500);

let d3 = retry_delay(3); // base=2000, jitter=2000*0/4=0 → 2000ms
assert_eq!(d3.as_millis(), 2000);

let d4 = retry_delay(4); // base=4000, jitter=4000*1/4=1000 → 5000ms
assert_eq!(d4.as_millis(), 5000);
}
}
Loading