Skip to content

Commit 29d82b8

Browse files
committed
storage: make decode_and_mfp yield more often
We've seen evidence in production of this operator often going more than 5s without yielding, especially during rehydration and large unindexed SELECTs. This affects interactivity as well as the time between when a timely-driven Future is woken and when it is scheduled again. Attempt to fix this with two tweaks to our yield heuristics. First, decrease the fuel between yields from 1_000_000 to 100_000. Second, also count each mfp evaluation against our fuel, in case it contains a restrictive filter. The new 100_000 constant comes from eyeballing a replica in prod that was non-interactive. At the time, it was decoding ~300k rows per second, so 100k doesn't seem terribly low in comparison.
1 parent 83e2171 commit 29d82b8

5 files changed

Lines changed: 19 additions & 8 deletions

File tree

src/compute/src/render/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ pub fn build_compute_dataflow<A: Allocate>(
234234
dataflow.until.clone(),
235235
mfp.as_mut(),
236236
Some(flow_control),
237-
// Copy the logic in DeltaJoin/Get/Join to start.
238-
|_timer, count| count > 1_000_000,
237+
// Tuned down from 1_000_000 (the original logic copied from
238+
// DeltaJoin/Get/Join) to increase interactivity.
239+
|_timer, count| count > 100_000,
239240
);
240241

241242
// If `mfp` is non-identity, we need to apply what remains.

src/compute/src/sink/persist_sink.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,9 @@ where
102102
Antichain::new(), // we want all updates
103103
None, // no MFP
104104
None, // no flow control
105-
// Copy the logic in DeltaJoin/Get/Join to start.
106-
|_timer, count| count > 1_000_000,
105+
// Tuned down from 1_000_000 (the original logic copied from DeltaJoin/Get/Join) to increase
106+
// interactivity.
107+
|_timer, count| count > 100_000,
107108
);
108109
use differential_dataflow::AsCollection;
109110
let persist_collection = ok_stream

src/storage-operators/src/persist_source.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,13 @@ impl PendingWork {
365365
match (key, val) {
366366
(Ok(SourceData(Ok(row))), Ok(())) => {
367367
if let Some(mfp) = map_filter_project {
368+
// We originally accounted work as the number of outputs, to give downstream
369+
// operators a chance to reduce down anything we've emitted. This mfp call
370+
// might have a restrictive filter, which would have been counted as no
371+
// work. However, in practice, we've been decode_and_mfp be a source of
372+
// interactivity loss during rehydration, so we now also count each mfp
373+
// evaluation against our fuel.
374+
*work += 1;
368375
let arena = mz_repr::RowArena::new();
369376
let mut datums_local = datum_vec.borrow_with(&row);
370377
for result in mfp.evaluate(

src/storage/src/render/sinks.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
5656
timely::progress::Antichain::new(),
5757
None,
5858
None,
59-
// Copy the logic in DeltaJoin/Get/Join to start.
60-
|_timer, count| count > 1_000_000,
59+
// Tuned down from 1_000_000 (the original logic copied from DeltaJoin/Get/Join) to increase
60+
// interactivity.
61+
|_timer, count| count > 100_000,
6162
);
6263
tokens.push(source_token);
6364

src/storage/src/render/sources.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,9 @@ where
359359
Antichain::new(),
360360
None,
361361
None,
362-
// Copy the logic in DeltaJoin/Get/Join to start.
363-
|_timer, count| count > 1_000_000,
362+
// Tuned down from 1_000_000 (the original logic copied from
363+
// DeltaJoin/Get/Join) to increase interactivity.
364+
|_timer, count| count > 100_000,
364365
);
365366
let (tx_source_ok, tx_source_err) = (
366367
tx_source_ok_stream.as_collection(),

0 commit comments

Comments
 (0)