Skip to content

Commit fb034af

Browse files
antiguruclaude
andcommitted
Document operator fusion in mdbook internals chapter
Add chapter 5.4 explaining how operator fusion works: fusibility constraints, group detection, scheduling, capability mapping, correctness argument for progress tracking, and configuration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6f705b3 commit fb034af

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

mdbook/src/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@
3636
- [Internals](./chapter_5/chapter_5.md)
3737
- [Communication](./chapter_5/chapter_5_1.md)
3838
- [Progress Tracking](./chapter_5/chapter_5_2.md)
39+
- [Operator Fusion](./chapter_5/chapter_5_4.md)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Operator fusion
2+
3+
When building dataflows, users often compose many small operators: a `map` followed by a `filter`, a `flat_map`, another `map`, and finally a `probe`.
4+
Each operator is a separate node in the progress tracking graph, with its own `SharedProgress` handle, pointstamp accounting, and scheduling overhead.
5+
For long pipelines, this overhead dominates actual computation.
6+
7+
Operator fusion detects groups of operators that can be scheduled as a single unit, hiding intermediate nodes from the reachability tracker.
8+
This section explains how fusion works and why it preserves correctness.
9+
10+
## Which operators fuse
11+
12+
Fusion applies to operators connected by pipeline (thread-local) channels where the group's internal progress tracking can be collapsed without losing information.
13+
An operator is *fusible* if:
14+
15+
* It does not observe frontiers (`notify == false`).
16+
Frontier-observing operators buffer data until they receive a notification that a timestamp is complete.
17+
Fusing them would require propagating frontiers within the group, which the scheduler does not do.
18+
* All (input, output) pairs in its internal summary are the identity.
19+
Non-identity summaries (like the feedback operator's `Product(0, 1)`) require per-member timestamp transformation that the group's aggregate reporting does not support.
20+
* It has an operator implementation (not already tombstoned).
21+
22+
An edge between two fusible operators is *fusible* if the target uses pipeline pact on the corresponding input port.
23+
Exchange or broadcast pacts route data through inter-worker channels that the group scheduler cannot intercept.
24+
25+
Operators connected by fusible edges are grouped using union-find.
26+
Groups with fewer members than a configurable threshold (`fuse_chain_length`, default 2) are left alone.
27+
There is no restriction on fan-in or fan-out: diamonds, concatenations, and branches all fuse.
28+
29+
## How a fused group presents to the subgraph
30+
31+
A fused group replaces its member operators with a single `GroupScheduler` installed at the representative slot (the lowest-indexed member).
32+
All other members become tombstones.
33+
34+
The group exposes:
35+
36+
* **Group inputs**: member input ports that receive edges from outside the group.
37+
* **Group outputs**: member output ports that send edges outside the group, or that have no outgoing edges (their capabilities still need tracking).
38+
39+
The subgraph's `edge_stash` is rewritten: internal edges are removed, incoming edges are retargeted to the representative's group input ports, and outgoing edges are sourced from the representative's group output ports.
40+
41+
## Scheduling
42+
43+
Members are executed in topological order, computed by Kahn's algorithm over internal edges.
44+
This guarantees that data pushed by a producer through a pipeline channel is available to its consumer when the consumer runs.
45+
46+
The physical pipeline channels between members are established during operator construction and are unaffected by fusion.
47+
Only the progress tracking layer changes.
48+
49+
### Activation forwarding
50+
51+
Pipeline channels activate the original target operator when data arrives.
52+
After fusion, the target may be a tombstone.
53+
Each tombstone records a `forward_to` field pointing to the group representative.
54+
The subgraph's scheduling loop checks this field and redirects the activation.
55+
56+
## Why the fused group reports correct progress
57+
58+
The key insight is that because all members have identity summaries, a capability at any member's output port at timestamp `t` implies the same timestamp `t` at every reachable group output.
59+
The timestamp does not change along any internal path.
60+
61+
### Consumeds and produceds
62+
63+
The group reports consumeds only for group input ports and produceds only for group output ports.
64+
Intermediate consumeds and produceds (data passing between members through internal pipeline channels) would cancel in the reachability tracker: a member producing `(t, +d)` and the next member consuming `(t, -d)` net to zero.
65+
Since the internal edges are removed from the tracker, these intermediate changes are simply not reported.
66+
67+
### Internal capabilities
68+
69+
Each member reports internal capability changes through its `SharedProgress.internals`.
70+
In the unfused case, the reachability tracker sees each member's capabilities at their respective source locations and computes implications through the graph.
71+
72+
The group scheduler aggregates each member's internal changes to the group outputs via a *capability map*.
73+
This map is computed by a single reverse-topological pass over the group's internal DAG:
74+
75+
1. Seed: member output ports that are group outputs map directly to themselves.
76+
2. Reverse pass: for each member from last to first in topological order, for each output port, follow internal edges forward to downstream members.
77+
Use the downstream member's summary to find which of its output ports are reachable from the targeted input port.
78+
Union the reachability sets.
79+
80+
This produces `capability_map[member][output_port] -> Vec<group_output_index>`.
81+
82+
When the group scheduler runs, it reads each member's `SharedProgress.internals` and reports them at every group output reached via the capability map.
83+
Because all summaries are identity, this is equivalent to what the reachability tracker would compute by composing identity summaries along internal paths.
84+
85+
### Initial capability accounting
86+
87+
During `initialize()`, each member reports `+peers` capabilities at `T::minimum()` on its output ports.
88+
The group transfers ALL members' initial capabilities to the group's `SharedProgress`, mapped through the capability map.
89+
Members' initial internals are then cleared to prevent double-counting.
90+
91+
This is necessary because each member independently drops its initial capability during execution, producing `(-peers)` changes that flow through the capability map.
92+
If only one member's `+peers` were reported, the tracker would go negative.
93+
94+
## Composed summary
95+
96+
The group's `internal_summary` describes which group outputs are reachable from which group inputs.
97+
For each group input, the scheduler finds which member output ports are reachable (via the member's own summary), then follows the capability map to group outputs.
98+
If a path exists, the summary entry is the identity; otherwise no entry exists.
99+
100+
This composed summary is used by the reachability tracker to determine implications from the group's sources to downstream operators.
101+
102+
## What does not fuse
103+
104+
Several classes of operators are excluded:
105+
106+
* **Frontier-observing operators** (`notify == true`): `inspect`, `unary_frontier`, and any operator that requests notifications.
107+
These need intra-group frontier propagation, which the group scheduler does not implement.
108+
* **Operators with non-identity summaries**: the `Feedback` operator increments a loop counter coordinate.
109+
Fusing it would require the group to transform timestamps along internal paths.
110+
* **Exchange-pact operators**: data moves between workers through channels outside the group scheduler's control.
111+
* **Operators in iteration scopes**: the nested timestamp structure typically involves non-identity summaries at scope boundaries.
112+
113+
In practice, the operators that fuse are the "glue" operators: `map`, `flat_map`, `filter`, `Enter`, `Leave`, `Concatenate`, and similar single-purpose transformations.
114+
In differential dataflow's BFS, fusion merges groups like `[Enter, Concatenate, Negate, AsCollection, Concatenate, ResultsIn]` into single scheduling units.
115+
116+
## Configuration
117+
118+
Fusion is controlled by `WorkerConfig::fuse_chain_length(n)`:
119+
120+
* `n >= 2` (default): fuse groups of at least `n` members.
121+
* `n == 0` or `n == 1`: disable fusion entirely.
122+
123+
From the command line, pass `--fuse-chain-length N` to any timely program that uses `execute_from_args`.

0 commit comments

Comments
 (0)