Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/pass-join-info-to-queryfn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/db': patch
'@tanstack/query-db-collection': patch
---

Pass join information to queryFn in on-demand mode. This enables server-side joins before pagination, fixing inconsistent page sizes when queries combine pagination with filters on joined collections. The new `joins` array in `LoadSubsetOptions` contains collection ID, alias, join type, key expressions, and associated filters.
155 changes: 155 additions & 0 deletions INVESTIGATION-join-pagination-queryFn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Investigation: Passing Join Information to queryFn in On-Demand Mode

**Status: IMPLEMENTED**

## Problem Statement

When using `queryCollection` in on-demand mode with joins and pagination, the pagination was applied **before** join filters were evaluated, leading to inconsistent page sizes or empty results.

**User's scenario:**

- `tasksCollection` (queryCollection, on-demand) with `limit: 10`
- Joined with `accountsCollection`
- Filter on `account.name = 'example'`

**What was happening:**

1. `tasksCollection.queryFn` received `{ limit: 10, where: <task filters only> }`
2. Backend returned 10 tasks
3. Client-side join with `accountsCollection`
4. Account filter applied client-side
5. Result: Only 6 tasks match (page size inconsistent)

## Solution Implemented

We extended `LoadSubsetOptions` to include join information, allowing the sync layer to construct server-side join queries that filter before pagination.

### New `JoinInfo` Type

```typescript
// packages/db/src/types.ts
export type JoinInfo = {
/** The ID of the collection being joined */
collectionId: string
/** The alias used for the joined collection in the query */
alias: string
/** The type of join to perform */
type: `inner` | `left` | `right` | `full` | `cross`
/** The join key expression from the main collection (e.g., task.account_id) */
localKey: BasicExpression
/** The join key expression from the joined collection (e.g., account.id) */
foreignKey: BasicExpression
/** Filters that apply to the joined collection */
where?: BasicExpression<boolean>
/** OrderBy expressions that reference the joined collection */
orderBy?: OrderBy
}
```

### Extended `LoadSubsetOptions`

```typescript
export type LoadSubsetOptions = {
where?: BasicExpression<boolean>
orderBy?: OrderBy
limit?: number
cursor?: CursorExpressions
offset?: number
subscription?: Subscription
// NEW: Join information for server-side query construction
joins?: Array<JoinInfo>
}
```

## Implementation Details

### 1. Join Info Extraction (`packages/db/src/query/optimizer.ts`)

Added `extractJoinInfo()` function that:

- Analyzes query IR to extract join clause information
- Associates filters from `sourceWhereClauses` with their respective joins
- Associates orderBy expressions with their respective joins
- Returns a map of main source alias → Array<JoinInfo>

### 2. Compilation Pipeline (`packages/db/src/query/compiler/index.ts`)

- `CompilationResult` now includes `joinInfoBySource`
- `compileQuery()` passes through join info from optimizer

### 3. Live Query Builder (`packages/db/src/query/live/collection-config-builder.ts`)

- Added `joinInfoBySourceCache` to store join info
- Populated during compilation

### 4. Collection Subscriber (`packages/db/src/query/live/collection-subscriber.ts`)

- Added `getJoinInfoForAlias()` method
- Passes join info when creating subscriptions

### 5. Collection Subscription (`packages/db/src/collection/subscription.ts`)

- `CollectionSubscriptionOptions` now accepts `joinInfo`
- `requestLimitedSnapshot()` includes `joins` in `LoadSubsetOptions`

### 6. Serialization (`packages/query-db-collection/src/serialization.ts`)

- `serializeLoadSubsetOptions()` now serializes join info for query key generation

## Usage in queryFn

Now `queryFn` can access join information:

```typescript
queryFn: async (context) => {
const opts = context.meta.loadSubsetOptions

if (opts.joins?.length) {
// Construct a query with server-side joins
// Example with Drizzle:
let query = db.select().from(tasks)

for (const join of opts.joins) {
// Add join based on join.type, join.localKey, join.foreignKey
query = query.innerJoin(accounts, eq(tasks.accountId, accounts.id))

// Apply joined collection's filter
if (join.where) {
query = query.where(/* translate join.where to Drizzle */)
}
}

// Apply main collection's filter
if (opts.where) {
query = query.where(/* translate opts.where to Drizzle */)
}

return query.limit(opts.limit).offset(opts.offset)
}

// Simple query without joins (existing behavior)
return db.select().from(tasks).where(/* opts.where */).limit(opts.limit)
}
```

## Files Changed

- `packages/db/src/types.ts` - Added `JoinInfo` type, extended `LoadSubsetOptions` and `SubscribeChangesOptions`
- `packages/db/src/query/optimizer.ts` - Added `extractJoinInfo()`, updated `OptimizationResult`
- `packages/db/src/query/compiler/index.ts` - Added `joinInfoBySource` to `CompilationResult`
- `packages/db/src/query/live/collection-config-builder.ts` - Added `joinInfoBySourceCache`
- `packages/db/src/query/live/collection-subscriber.ts` - Added `getJoinInfoForAlias()`
- `packages/db/src/collection/subscription.ts` - Pass join info in `loadSubset` calls
- `packages/query-db-collection/src/serialization.ts` - Serialize joins in query keys
- `packages/db/tests/query/optimizer.test.ts` - Added tests for join info extraction

## Test Coverage

Added tests in `packages/db/tests/query/optimizer.test.ts`:

- Empty map for queries without joins
- Basic inner join extraction
- WHERE clause inclusion for joined collections
- OrderBy inclusion for joined collections
- Multiple joins handling
- Swapped key expression handling
9 changes: 9 additions & 0 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { BasicExpression, OrderBy } from '../query/ir.js'
import type { IndexInterface } from '../indexes/base-index.js'
import type {
ChangeMessage,
JoinInfo,
LoadSubsetOptions,
Subscription,
SubscriptionEvents,
Expand Down Expand Up @@ -45,6 +46,12 @@ type CollectionSubscriptionOptions = {
whereExpression?: BasicExpression<boolean>
/** Callback to call when the subscription is unsubscribed */
onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void
/**
* Join information for server-side query construction.
* When present, this is included in loadSubset calls so the sync layer
* can perform joins before pagination.
*/
joinInfo?: Array<JoinInfo>
}

export class CollectionSubscription
Expand Down Expand Up @@ -591,6 +598,8 @@ export class CollectionSubscription
cursor: cursorExpressions, // Cursor expressions passed separately
offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset
subscription: this,
// Include join info for server-side query construction
joins: this.options.joinInfo,
}
const syncResult = this.collection._sync.loadSubset(loadOptions)

Expand Down
16 changes: 15 additions & 1 deletion packages/db/src/query/compiler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {
import type { LazyCollectionCallbacks } from './joins.js'
import type { Collection } from '../../collection/index.js'
import type {
JoinInfo,
KeyedStream,
NamespacedAndKeyedStream,
ResultStream,
Expand Down Expand Up @@ -67,6 +68,13 @@ export interface CompilationResult {
* the inner aliases where collection subscriptions were created.
*/
aliasRemapping: Record<string, string>

/**
* Map of main source aliases to their join information for server-side query construction.
* This enables the sync layer to perform joins before pagination, ensuring consistent page sizes
* when filtering by joined collection properties.
*/
joinInfoBySource: Map<string, Array<JoinInfo>>
}

/**
Expand Down Expand Up @@ -106,7 +114,11 @@ export function compileQuery(
validateQueryStructure(rawQuery)

// Optimize the query before compilation
const { optimizedQuery: query, sourceWhereClauses } = optimizeQuery(rawQuery)
const {
optimizedQuery: query,
sourceWhereClauses,
joinInfoBySource,
} = optimizeQuery(rawQuery)

// Create mapping from optimized query to original for caching
queryMapping.set(query, rawQuery)
Expand Down Expand Up @@ -345,6 +357,7 @@ export function compileQuery(
sourceWhereClauses,
aliasToCollectionId,
aliasRemapping,
joinInfoBySource,
}
cache.set(rawQuery, compilationResult)

Expand Down Expand Up @@ -375,6 +388,7 @@ export function compileQuery(
sourceWhereClauses,
aliasToCollectionId,
aliasRemapping,
joinInfoBySource,
}
cache.set(rawQuery, compilationResult)

Expand Down
6 changes: 6 additions & 0 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type { OrderByOptimizationInfo } from '../compiler/order-by.js'
import type { Collection } from '../../collection/index.js'
import type {
CollectionConfigSingleRowOption,
JoinInfo,
KeyedStream,
ResultStream,
StringCollationConfig,
Expand Down Expand Up @@ -136,6 +137,9 @@ export class CollectionConfigBuilder<
| Map<string, BasicExpression<boolean>>
| undefined

// Map of source aliases to their join info for server-side query construction
public joinInfoBySourceCache: Map<string, Array<JoinInfo>> | undefined

// Map of source alias to subscription
readonly subscriptions: Record<string, CollectionSubscription> = {}
// Map of source aliases to functions that load keys for that lazy source
Expand Down Expand Up @@ -617,6 +621,7 @@ export class CollectionConfigBuilder<
this.inputsCache = undefined
this.pipelineCache = undefined
this.sourceWhereClausesCache = undefined
this.joinInfoBySourceCache = undefined

// Reset lazy source alias state
this.lazySources.clear()
Expand Down Expand Up @@ -664,6 +669,7 @@ export class CollectionConfigBuilder<

this.pipelineCache = compilation.pipeline
this.sourceWhereClausesCache = compilation.sourceWhereClauses
this.joinInfoBySourceCache = compilation.joinInfoBySource
this.compiledAliasToCollectionId = compilation.aliasToCollectionId

// Defensive check: verify all compiled aliases have corresponding inputs
Expand Down
24 changes: 24 additions & 0 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { MultiSetArray, RootStreamBuilder } from '@tanstack/db-ivm'
import type { Collection } from '../../collection/index.js'
import type {
ChangeMessage,
JoinInfo,
SubscriptionStatusChangeEvent,
} from '../../types.js'
import type { Context, GetResult } from '../builder/types.js'
Expand Down Expand Up @@ -197,13 +198,17 @@ export class CollectionSubscriber<
this.sendChangesToPipeline(changes)
}

// Get join info if this is the main source in a join query
const joinInfo = this.getJoinInfoForAlias()

// Create subscription with onStatusChange - listener is registered before snapshot
// Note: For non-ordered queries (no limit/offset), we use trackLoadSubsetPromise: false
// which is the default behavior in subscribeChanges
const subscription = this.collection.subscribeChanges(sendChanges, {
...(includeInitialState && { includeInitialState }),
whereExpression,
onStatusChange,
joinInfo,
})

return subscription
Expand All @@ -230,11 +235,15 @@ export class CollectionSubscriber<
)
}

// Get join info if this is the main source in a join query
const joinInfo = this.getJoinInfoForAlias()

// Subscribe to changes with onStatusChange - listener is registered before any snapshot
// values bigger than what we've sent don't need to be sent because they can't affect the topK
const subscription = this.collection.subscribeChanges(sendChangesInRange, {
whereExpression,
onStatusChange,
joinInfo,
})
subscriptionHolder.current = subscription

Expand Down Expand Up @@ -386,6 +395,21 @@ export class CollectionSubscriber<
return sourceWhereClausesCache.get(this.alias)
}

/**
* Gets join info for this alias if it's the main source in a join query.
* Join info is only returned for the main collection (FROM clause),
* not for joined collections, since only the main collection needs
* to know about joins for server-side query construction.
*/
private getJoinInfoForAlias(): Array<JoinInfo> | undefined {
const joinInfoBySourceCache =
this.collectionConfigBuilder.joinInfoBySourceCache
if (!joinInfoBySourceCache) {
return undefined
}
return joinInfoBySourceCache.get(this.alias)
}

private getOrderByInfo(): OrderByOptimizationInfo | undefined {
const info =
this.collectionConfigBuilder.optimizableOrderByCollections[
Expand Down
Loading
Loading