perf(llc): Reduce the number of read message per channel from DB when paginating (part 2)#2681
perf(llc): Reduce the number of read message per channel from DB when paginating (part 2)#2681VelikovPetar wants to merge 18 commits into
Conversation
# Conflicts: # packages/stream_chat_persistence/CHANGELOG.md
…ture/FLU-485_optimize_read_message_from_db_part2 # Conflicts: # packages/stream_chat_persistence/CHANGELOG.md # packages/stream_chat_persistence/lib/src/dao/message_dao.dart
📝 WalkthroughWalkthroughThis PR optimizes the stream_chat_persistence layer by batching related database fetches, moving filtering to SQL level, and adding bulk query APIs. A new ChangesPersistence Layer Query Optimization
🎯 4 (Complex) | ⏱️ ~75 minutes Possibly Related PRs
Suggested Reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2681 +/- ##
==========================================
+ Coverage 65.33% 65.62% +0.29%
==========================================
Files 423 424 +1
Lines 26646 26844 +198
==========================================
+ Hits 17408 17616 +208
+ Misses 9238 9228 -10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart`:
- Around line 306-317: The cursor predicates only filter by messages.createdAt,
which can cause duplicates/misses when createdAt ties exist; update each branch
(lessThanCutoff, lessThanOrEqualCutoff, greaterThanCutoff,
greaterThanOrEqualCutoff) to add a secondary predicate on messages.id matching
the ordering key so the pair (createdAt, id) is used (e.g., for lessThan use
createdAt < t OR (createdAt == t AND id < cursorId)); modify the query.where
calls in the blocks referencing messages.createdAt and messages.id to apply the
combined comparisons for all four operators, and apply the same change to the
analogous blocks around the second occurrence (lines referenced in the comment:
the other block at 350-362).
- Around line 40-43: The recursive quoted-message hydration in
_messagesFromJoinRows lacks a visited-set, so protect against cycles by adding a
visited ID set parameter (e.g., Set<String> visited or Set<String>
visitedMessageIds) defaulting to empty, check the current message's id before
hydrating its quote(s) and skip recursion if already visited, and pass the
updated set when calling the same hydration logic recursively; apply the same
visited-set guard to the other recursive hydration block referenced around lines
88-106 so both recursion entry points use the visited set to prevent infinite
recursion.
In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart`:
- Around line 39-42: The recursive quoted-message hydration in
_messagesFromJoinRows (and the similar logic around lines 88-106) lacks cycle
protection; add a visited-id guard: introduce a Set<String> visited (or Set<int>
depending on message id type) parameter with a default empty set, add the
current message id to visited before recursing, and before resolving a quoted
message check if its id is already in visited; if it is, stop recursion (e.g.,
set quotedMessage to null or a shallow reference) to break the cycle. Pass the
visited set through any recursive calls so cycles are detected across the entire
resolution chain and avoid infinite recursion.
In `@packages/stream_chat_persistence/lib/src/db/query_utils.dart`:
- Around line 14-17: The public function chunked<T>(List<T> input, [int size =
900]) can hang or misbehave if size <= 0; add an upfront argument validation in
chunked to guard against non-positive sizes (e.g. if (size <= 0) throw
ArgumentError.value(size, 'size', 'must be > 0')) so the for-loop using i +=
size cannot loop infinitely or produce invalid sublists; keep the check at the
top of chunked before the for-loop and reference the existing parameters input
and size.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3db24b32-5504-4c67-995b-722956c7187b
📒 Files selected for processing (16)
packages/stream_chat_persistence/CHANGELOG.mdpackages/stream_chat_persistence/lib/src/dao/draft_message_dao.dartpackages/stream_chat_persistence/lib/src/dao/message_dao.dartpackages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dartpackages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dartpackages/stream_chat_persistence/lib/src/dao/poll_dao.dartpackages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dartpackages/stream_chat_persistence/lib/src/dao/reaction_dao.dartpackages/stream_chat_persistence/lib/src/db/query_utils.dartpackages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dartpackages/stream_chat_persistence/test/src/dao/message_dao_test.dartpackages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dartpackages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dartpackages/stream_chat_persistence/test/src/dao/poll_dao_test.dartpackages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dartpackages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart
| Future<List<Message>> _messagesFromJoinRows( | ||
| List<TypedResult> rows, { | ||
| bool fetchDraft = false, | ||
| }) async { |
There was a problem hiding this comment.
Guard recursive quote hydration against cyclic quote graphs.
_messagesFromJoinRows recursively hydrates quoted messages without a visited-set guard. A cycle like A -> B -> A will recurse until stack overflow.
💡 Suggested fix
Future<List<Message>> _messagesFromJoinRows(
List<TypedResult> rows, {
bool fetchDraft = false,
+ Set<String>? _visitedQuoteIds,
}) async {
if (rows.isEmpty) return const [];
+ final visited = _visitedQuoteIds ?? <String>{};
final messageIds = <String>[];
final quotedIds = <String>[];
@@
for (final row in rows) {
final msg = row.readTable(messages);
messageIds.add(msg.id);
@@
}
+ visited.addAll(messageIds);
@@
- if (quotedIds.isNotEmpty) {
+ final nextQuotedIds = quotedIds.where((id) => !visited.contains(id)).toList();
+ if (nextQuotedIds.isNotEmpty) {
final quoteRows = await (select(messages).join([
@@
- ..where(messages.id.isIn(quotedIds)))
+ ..where(messages.id.isIn(nextQuotedIds)))
.get();
final quotedMessages = await _messagesFromJoinRows(
quoteRows,
fetchDraft: true,
+ _visitedQuoteIds: visited,
);Also applies to: 88-106
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
40 - 43, The recursive quoted-message hydration in _messagesFromJoinRows lacks a
visited-set, so protect against cycles by adding a visited ID set parameter
(e.g., Set<String> visited or Set<String> visitedMessageIds) defaulting to
empty, check the current message's id before hydrating its quote(s) and skip
recursion if already visited, and pass the updated set when calling the same
hydration logic recursively; apply the same visited-set guard to the other
recursive hydration block referenced around lines 88-106 so both recursion entry
points use the visited set to prevent infinite recursion.
| if (lessThanCutoff case final t?) { | ||
| query.where(messages.createdAt.isSmallerThanValue(t)); | ||
| } | ||
| if (lessThanOrEqualCutoff case final t?) { | ||
| query.where(messages.createdAt.isSmallerOrEqualValue(t)); | ||
| } | ||
| if (greaterThanCutoff case final t?) { | ||
| query.where(messages.createdAt.isBiggerThanValue(t)); | ||
| } | ||
| if (greaterThanOrEqualCutoff case final t?) { | ||
| query.where(messages.createdAt.isBiggerOrEqualValue(t)); | ||
| } |
There was a problem hiding this comment.
Use (createdAt, id) cursor predicates to avoid duplicate/missing rows on timestamp ties.
Current cursor filtering only checks createdAt, but ordering also uses id. When multiple messages share the same createdAt, pagination can skip or repeat records across pages.
💡 Suggested fix
- Future<DateTime?> _lookupMessageCreatedAt(String id) {
+ Future<(DateTime, String)?> _lookupMessageCursor(String id) {
return (selectOnly(messages)
- ..addColumns([messages.createdAt])
+ ..addColumns([messages.createdAt, messages.id])
..where(messages.id.equals(id))
..where(
messages.parentId.isNull() | messages.showInChannel.equals(true),
))
- .map((row) => row.read(messages.createdAt))
+ .map((row) => (
+ row.read(messages.createdAt)!,
+ row.read(messages.id)!,
+ ))
.getSingleOrNull();
}- if (lessThanCutoff case final t?) {
- query.where(messages.createdAt.isSmallerThanValue(t));
+ if (lessThanCursor case final (t, id)?) {
+ query.where(
+ messages.createdAt.isSmallerThanValue(t) |
+ (messages.createdAt.equals(t) & messages.id.isSmallerThanValue(id)),
+ );
}Apply analogous predicates for <=, >, >=.
Also applies to: 350-362
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
306 - 317, The cursor predicates only filter by messages.createdAt, which can
cause duplicates/misses when createdAt ties exist; update each branch
(lessThanCutoff, lessThanOrEqualCutoff, greaterThanCutoff,
greaterThanOrEqualCutoff) to add a secondary predicate on messages.id matching
the ordering key so the pair (createdAt, id) is used (e.g., for lessThan use
createdAt < t OR (createdAt == t AND id < cursorId)); modify the query.where
calls in the blocks referencing messages.createdAt and messages.id to apply the
combined comparisons for all four operators, and apply the same change to the
analogous blocks around the second occurrence (lines referenced in the comment:
the other block at 350-362).
| Future<List<Message>> _messagesFromJoinRows( | ||
| List<TypedResult> rows, { | ||
| bool fetchDraft = false, | ||
| }) async { |
There was a problem hiding this comment.
Add cycle protection to recursive quoted-message hydration.
_messagesFromJoinRows recursively resolves quotes with no visited guard. Cyclic quote references can lead to infinite recursion and crash.
💡 Suggested fix
Future<List<Message>> _messagesFromJoinRows(
List<TypedResult> rows, {
bool fetchDraft = false,
+ Set<String>? _visitedQuoteIds,
}) async {
if (rows.isEmpty) return const [];
+ final visited = _visitedQuoteIds ?? <String>{};
@@
for (final row in rows) {
final msg = row.readTable(pinnedMessages);
messageIds.add(msg.id);
@@
}
+ visited.addAll(messageIds);
@@
- if (quotedIds.isNotEmpty) {
+ final nextQuotedIds = quotedIds.where((id) => !visited.contains(id)).toList();
+ if (nextQuotedIds.isNotEmpty) {
@@
- ..where(pinnedMessages.id.isIn(quotedIds)))
+ ..where(pinnedMessages.id.isIn(nextQuotedIds)))
.get();
final quotedMessages = await _messagesFromJoinRows(
quoteRows,
fetchDraft: true,
+ _visitedQuoteIds: visited,
);Also applies to: 88-106
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart` around
lines 39 - 42, The recursive quoted-message hydration in _messagesFromJoinRows
(and the similar logic around lines 88-106) lacks cycle protection; add a
visited-id guard: introduce a Set<String> visited (or Set<int> depending on
message id type) parameter with a default empty set, add the current message id
to visited before recursing, and before resolving a quoted message check if its
id is already in visited; if it is, stop recursion (e.g., set quotedMessage to
null or a shallow reference) to break the cycle. Pass the visited set through
any recursive calls so cycles are detected across the entire resolution chain
and avoid infinite recursion.
| Iterable<List<T>> chunked<T>(List<T> input, [int size = 900]) sync* { | ||
| for (var i = 0; i < input.length; i += size) { | ||
| yield input.sublist(i, math.min(i + size, input.length)); | ||
| } |
There was a problem hiding this comment.
Guard size to prevent infinite loops on invalid input.
chunked is public, and size <= 0 makes the loop at Line 15 non-terminating or invalid. Add an argument check up front.
Suggested fix
Iterable<List<T>> chunked<T>(List<T> input, [int size = 900]) sync* {
+ if (size <= 0) {
+ throw ArgumentError.value(size, 'size', 'must be greater than 0');
+ }
for (var i = 0; i < input.length; i += size) {
yield input.sublist(i, math.min(i + size, input.length));
}
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/stream_chat_persistence/lib/src/db/query_utils.dart` around lines 14
- 17, The public function chunked<T>(List<T> input, [int size = 900]) can hang
or misbehave if size <= 0; add an upfront argument validation in chunked to
guard against non-positive sizes (e.g. if (size <= 0) throw
ArgumentError.value(size, 'size', 'must be > 0')) so the for-loop using i +=
size cannot loop infinitely or produce invalid sublists; keep the check at the
top of chunked before the for-loop and reference the existing parameters input
and size.
Submit a pull request
Linear: Part two of: FLU-485
Review after: #2679
CLA
Description of the pull request
Replaces the per-message hydration (
_messageFromJoinRow) with a batched hydration (_messagesFromJoinRows).Testing:
Apply the following patch and run the new parity/benchmark tests to verify the performance improvements and no regression (except where some behaviour was intentionally changed):
Benchmark and parity tests
Summary by CodeRabbit
New Features
Bug Fixes
Performance