Skip to content

Commit 8279683

Browse files
authored
Merge pull request #22358 from danhhz/persist_yield
storage: make decode_and_mfp yield more often
2 parents 8f575e7 + f55db1b commit 8279683

9 files changed

Lines changed: 69 additions & 22 deletions

File tree

src/adapter/src/flags.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ fn persist_config(config: &SystemVars) -> PersistParameters {
180180
storage_sink_minimum_batch_updates: Some(
181181
config.storage_persist_sink_minimum_batch_updates(),
182182
),
183+
storage_source_decode_fuel: Some(config.storage_source_decode_fuel()),
183184
next_listen_batch_retryer: Some(RetryParameters {
184185
initial_backoff: config.persist_next_listen_batch_retryer_initial_backoff(),
185186
multiplier: config.persist_next_listen_batch_retryer_multiplier(),

src/compute/src/render/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,6 @@ pub fn build_compute_dataflow<A: Allocate>(
236236
dataflow.until.clone(),
237237
mfp.as_mut(),
238238
Some(flow_control),
239-
// Copy the logic in DeltaJoin/Get/Join to start.
240-
|_timer, count| count > 1_000_000,
241239
);
242240

243241
// If `mfp` is non-identity, we need to apply what remains.

src/compute/src/sink/persist_sink.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ where
102102
Antichain::new(), // we want all updates
103103
None, // no MFP
104104
None, // no flow control
105-
// Copy the logic in DeltaJoin/Get/Join to start.
106-
|_timer, count| count > 1_000_000,
107105
);
108106
use differential_dataflow::AsCollection;
109107
let persist_collection = ok_stream

src/persist-client/src/cfg.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ message ProtoPersistParameters {
1919
mz_proto.ProtoDuration consensus_connect_timeout = 3;
2020
optional uint64 sink_minimum_batch_updates = 4;
2121
optional uint64 storage_sink_minimum_batch_updates = 8;
22+
optional uint64 storage_source_decode_fuel = 21;
2223
optional ProtoRetryParameters next_listen_batch_retryer = 5;
2324
optional uint64 stats_audit_percent = 9;
2425
optional bool stats_collection_enabled = 6;

src/persist-client/src/cfg.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ impl PersistConfig {
191191
storage_sink_minimum_batch_updates: AtomicUsize::new(
192192
Self::DEFAULT_SINK_MINIMUM_BATCH_UPDATES,
193193
),
194+
storage_source_decode_fuel: AtomicUsize::new(
195+
Self::DEFAULT_STORAGE_SOURCE_DECODE_FUEL,
196+
),
194197
next_listen_batch_retryer: RwLock::new(Self::DEFAULT_NEXT_LISTEN_BATCH_RETRYER),
195198
stats_audit_percent: AtomicUsize::new(Self::DEFAULT_STATS_AUDIT_PERCENT),
196199
stats_collection_enabled: AtomicBool::new(Self::DEFAULT_STATS_COLLECTION_ENABLED),
@@ -250,6 +253,14 @@ impl PersistConfig {
250253
.load(DynamicConfig::LOAD_ORDERING)
251254
}
252255

256+
/// The maximum amount of work to do in the persist_source mfp_and_decode
257+
/// operator before yielding.
258+
pub fn storage_source_decode_fuel(&self) -> usize {
259+
self.dynamic
260+
.storage_source_decode_fuel
261+
.load(DynamicConfig::LOAD_ORDERING)
262+
}
263+
253264
/// Returns a new instance of [PersistConfig] for tests.
254265
pub fn new_for_tests() -> Self {
255266
use mz_build_info::DUMMY_BUILD_INFO;
@@ -319,6 +330,9 @@ impl PersistConfig {
319330
/// Default value for [`PersistConfig::sink_minimum_batch_updates`].
320331
pub const DEFAULT_SINK_MINIMUM_BATCH_UPDATES: usize = 0;
321332

333+
/// Default value for [`PersistConfig::storage_source_decode_fuel`].
334+
pub const DEFAULT_STORAGE_SOURCE_DECODE_FUEL: usize = 1_000_000;
335+
322336
/// Default value for [`DynamicConfig::next_listen_batch_retry_params`].
323337
pub const DEFAULT_NEXT_LISTEN_BATCH_RETRYER: RetryParameters = RetryParameters {
324338
initial_backoff: Duration::from_millis(4),
@@ -425,6 +439,7 @@ pub struct DynamicConfig {
425439
reader_lease_duration: RwLock<Duration>,
426440
sink_minimum_batch_updates: AtomicUsize,
427441
storage_sink_minimum_batch_updates: AtomicUsize,
442+
storage_source_decode_fuel: AtomicUsize,
428443
stats_audit_percent: AtomicUsize,
429444
stats_collection_enabled: AtomicBool,
430445
stats_filter_enabled: AtomicBool,
@@ -780,6 +795,8 @@ pub struct PersistParameters {
780795
pub sink_minimum_batch_updates: Option<usize>,
781796
/// Configures [`PersistConfig::storage_sink_minimum_batch_updates`].
782797
pub storage_sink_minimum_batch_updates: Option<usize>,
798+
/// Configures [`PersistConfig::storage_source_decode_fuel`].
799+
pub storage_source_decode_fuel: Option<usize>,
783800
/// Configures [`DynamicConfig::stats_audit_percent`].
784801
pub stats_audit_percent: Option<usize>,
785802
/// Configures [`DynamicConfig::stats_collection_enabled`].
@@ -816,6 +833,7 @@ impl PersistParameters {
816833
reader_lease_duration: self_reader_lease_duration,
817834
sink_minimum_batch_updates: self_sink_minimum_batch_updates,
818835
storage_sink_minimum_batch_updates: self_storage_sink_minimum_batch_updates,
836+
storage_source_decode_fuel: self_storage_source_decode_fuel,
819837
next_listen_batch_retryer: self_next_listen_batch_retryer,
820838
stats_audit_percent: self_stats_audit_percent,
821839
stats_collection_enabled: self_stats_collection_enabled,
@@ -838,6 +856,7 @@ impl PersistParameters {
838856
reader_lease_duration: other_reader_lease_duration,
839857
sink_minimum_batch_updates: other_sink_minimum_batch_updates,
840858
storage_sink_minimum_batch_updates: other_storage_sink_minimum_batch_updates,
859+
storage_source_decode_fuel: other_storage_source_decode_fuel,
841860
next_listen_batch_retryer: other_next_listen_batch_retryer,
842861
stats_audit_percent: other_stats_audit_percent,
843862
stats_collection_enabled: other_stats_collection_enabled,
@@ -879,6 +898,9 @@ impl PersistParameters {
879898
if let Some(v) = other_storage_sink_minimum_batch_updates {
880899
*self_storage_sink_minimum_batch_updates = Some(v);
881900
}
901+
if let Some(v) = other_storage_source_decode_fuel {
902+
*self_storage_source_decode_fuel = Some(v);
903+
}
882904
if let Some(v) = other_next_listen_batch_retryer {
883905
*self_next_listen_batch_retryer = Some(v);
884906
}
@@ -926,6 +948,7 @@ impl PersistParameters {
926948
reader_lease_duration,
927949
sink_minimum_batch_updates,
928950
storage_sink_minimum_batch_updates,
951+
storage_source_decode_fuel,
929952
next_listen_batch_retryer,
930953
stats_audit_percent,
931954
stats_collection_enabled,
@@ -947,6 +970,7 @@ impl PersistParameters {
947970
&& reader_lease_duration.is_none()
948971
&& sink_minimum_batch_updates.is_none()
949972
&& storage_sink_minimum_batch_updates.is_none()
973+
&& storage_source_decode_fuel.is_none()
950974
&& next_listen_batch_retryer.is_none()
951975
&& stats_audit_percent.is_none()
952976
&& stats_collection_enabled.is_none()
@@ -977,6 +1001,7 @@ impl PersistParameters {
9771001
reader_lease_duration,
9781002
sink_minimum_batch_updates,
9791003
storage_sink_minimum_batch_updates,
1004+
storage_source_decode_fuel,
9801005
next_listen_batch_retryer,
9811006
stats_audit_percent,
9821007
stats_collection_enabled,
@@ -1053,6 +1078,11 @@ impl PersistParameters {
10531078
DynamicConfig::STORE_ORDERING,
10541079
);
10551080
}
1081+
if let Some(storage_source_decode_fuel) = storage_source_decode_fuel {
1082+
cfg.dynamic
1083+
.storage_source_decode_fuel
1084+
.store(*storage_source_decode_fuel, DynamicConfig::STORE_ORDERING);
1085+
}
10561086
if let Some(retry_params) = next_listen_batch_retryer {
10571087
let mut retry = cfg
10581088
.dynamic
@@ -1129,6 +1159,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
11291159
storage_sink_minimum_batch_updates: self
11301160
.storage_sink_minimum_batch_updates
11311161
.into_proto(),
1162+
storage_source_decode_fuel: self.storage_source_decode_fuel.into_proto(),
11321163
next_listen_batch_retryer: self.next_listen_batch_retryer.into_proto(),
11331164
stats_audit_percent: self.stats_audit_percent.into_proto(),
11341165
stats_collection_enabled: self.stats_collection_enabled.into_proto(),
@@ -1159,6 +1190,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
11591190
storage_sink_minimum_batch_updates: proto
11601191
.storage_sink_minimum_batch_updates
11611192
.into_rust()?,
1193+
storage_source_decode_fuel: proto.storage_source_decode_fuel.into_rust()?,
11621194
next_listen_batch_retryer: proto.next_listen_batch_retryer.into_rust()?,
11631195
stats_audit_percent: proto.stats_audit_percent.into_rust()?,
11641196
stats_collection_enabled: proto.stats_collection_enabled.into_rust()?,

src/sql/src/session/vars.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,15 @@ const STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES: ServerVar<usize> = ServerVar {
10861086
internal: true
10871087
};
10881088

1089+
/// Controls [`mz_persist_client::cfg::PersistConfig::storage_source_decode_fuel`].
1090+
const STORAGE_SOURCE_DECODE_FUEL: ServerVar<usize> = ServerVar {
1091+
name: UncasedStr::new("storage_source_decode_fuel"),
1092+
value: &PersistConfig::DEFAULT_STORAGE_SOURCE_DECODE_FUEL,
1093+
description: "The maximum amount of work to do in the persist_source mfp_and_decode \
1094+
operator before yielding.",
1095+
internal: true,
1096+
};
1097+
10891098
const STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: ServerVar<bool> = ServerVar {
10901099
name: UncasedStr::new("storage_record_source_sink_namespaced_errors"),
10911100
value: &true,
@@ -2459,6 +2468,7 @@ impl SystemVars {
24592468
.with_var(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
24602469
.with_var(&PERSIST_SINK_MINIMUM_BATCH_UPDATES)
24612470
.with_var(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
2471+
.with_var(&STORAGE_SOURCE_DECODE_FUEL)
24622472
.with_var(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
24632473
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
24642474
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
@@ -3054,6 +3064,10 @@ impl SystemVars {
30543064
*self.expect_value(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
30553065
}
30563066

3067+
pub fn storage_source_decode_fuel(&self) -> usize {
3068+
*self.expect_value(&STORAGE_SOURCE_DECODE_FUEL)
3069+
}
3070+
30573071
/// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
30583072
pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
30593073
*self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
@@ -4879,6 +4893,7 @@ fn is_persist_config_var(name: &str) -> bool {
48794893
|| name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
48804894
|| name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
48814895
|| name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
4896+
|| name == STORAGE_SOURCE_DECODE_FUEL.name()
48824897
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.name()
48834898
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.name()
48844899
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP.name()

src/storage-operators/src/persist_source.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use mz_ore::cast::CastFrom;
2222
use mz_ore::collections::CollectionExt;
2323
use mz_ore::vec::VecExt;
2424
use mz_persist_client::cache::PersistClientCache;
25+
use mz_persist_client::cfg::PersistConfig;
2526
use mz_persist_client::fetch::FetchedPart;
2627
use mz_persist_client::fetch::SerdeLeasedBatchPart;
2728
use mz_persist_client::operators::shard_source::shard_source;
@@ -86,7 +87,7 @@ use crate::metrics::BackpressureMetrics;
8687
/// using [`timely::dataflow::operators::generic::operator::empty`].
8788
///
8889
/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
89-
pub fn persist_source<G, YFn>(
90+
pub fn persist_source<G>(
9091
scope: &mut G,
9192
source_id: GlobalId,
9293
persist_clients: Arc<PersistClientCache>,
@@ -95,15 +96,13 @@ pub fn persist_source<G, YFn>(
9596
until: Antichain<Timestamp>,
9697
map_filter_project: Option<&mut MfpPlan>,
9798
flow_control: Option<FlowControl<G>>,
98-
yield_fn: YFn,
9999
) -> (
100100
Stream<G, (Row, Timestamp, Diff)>,
101101
Stream<G, (DataflowError, Timestamp, Diff)>,
102102
Rc<dyn Any>,
103103
)
104104
where
105105
G: Scope<Timestamp = mz_repr::Timestamp>,
106-
YFn: Fn(Instant, usize) -> bool + 'static,
107106
{
108107
let (stream, token) = scope.scoped(
109108
&format!("skip_granular_backpressure({})", source_id),
@@ -122,7 +121,6 @@ where
122121
summary: Refines::to_inner(fc.summary),
123122
metrics: fc.metrics,
124123
}),
125-
yield_fn,
126124
);
127125
(stream.leave(), token)
128126
},
@@ -143,7 +141,7 @@ type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, u64)>;
143141
///
144142
/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
145143
#[allow(clippy::needless_borrow)]
146-
pub fn persist_source_core<'g, G, YFn>(
144+
pub fn persist_source_core<'g, G>(
147145
scope: &RefinedScope<'g, G>,
148146
source_id: GlobalId,
149147
persist_clients: Arc<PersistClientCache>,
@@ -152,15 +150,14 @@ pub fn persist_source_core<'g, G, YFn>(
152150
until: Antichain<Timestamp>,
153151
map_filter_project: Option<&mut MfpPlan>,
154152
flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
155-
yield_fn: YFn,
156153
) -> (
157154
Stream<RefinedScope<'g, G>, (Result<Row, DataflowError>, (mz_repr::Timestamp, u64), Diff)>,
158155
Rc<dyn Any>,
159156
)
160157
where
161158
G: Scope<Timestamp = mz_repr::Timestamp>,
162-
YFn: Fn(Instant, usize) -> bool + 'static,
163159
{
160+
let cfg = persist_clients.cfg().clone();
164161
let name = source_id.to_string();
165162
let desc = metadata.relation_desc.clone();
166163
let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
@@ -214,7 +211,7 @@ where
214211
}
215212
},
216213
);
217-
let rows = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn);
214+
let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
218215
(rows, token)
219216
}
220217

@@ -242,16 +239,15 @@ fn filter_may_match(
242239
result.may_contain(Datum::True) || result.may_fail()
243240
}
244241

245-
pub fn decode_and_mfp<G, YFn>(
242+
pub fn decode_and_mfp<G>(
243+
cfg: PersistConfig,
246244
fetched: &Stream<G, FetchedPart<SourceData, (), Timestamp, Diff>>,
247245
name: &str,
248246
until: Antichain<Timestamp>,
249247
mut map_filter_project: Option<&mut MfpPlan>,
250-
yield_fn: YFn,
251248
) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
252249
where
253250
G: Scope<Timestamp = (mz_repr::Timestamp, u64)>,
254-
YFn: Fn(Instant, usize) -> bool + 'static,
255251
{
256252
let scope = fetched.scope();
257253
let mut builder = OperatorBuilder::new(
@@ -291,6 +287,11 @@ where
291287
}
292288
});
293289

290+
// Get the yield fuel once per schedule to amortize the cost of
291+
// loading the atomic.
292+
let yield_fuel = cfg.storage_source_decode_fuel();
293+
let yield_fn = |_, work| work >= yield_fuel;
294+
294295
let mut work = 0;
295296
let start_time = Instant::now();
296297
let mut output = updates_output.activate();
@@ -300,7 +301,7 @@ where
300301
&mut work,
301302
&name,
302303
start_time,
303-
&yield_fn,
304+
yield_fn,
304305
&until,
305306
map_filter_project.as_ref(),
306307
&mut datum_vec,
@@ -365,6 +366,13 @@ impl PendingWork {
365366
match (key, val) {
366367
(Ok(SourceData(Ok(row))), Ok(())) => {
367368
if let Some(mfp) = map_filter_project {
369+
// We originally accounted work as the number of outputs, to give downstream
370+
// operators a chance to reduce down anything we've emitted. This mfp call
371+
// might have a restrictive filter, which would have been counted as no
372+
// work. However, in practice, we've been decode_and_mfp be a source of
373+
// interactivity loss during rehydration, so we now also count each mfp
374+
// evaluation against our fuel.
375+
*work += 1;
368376
let arena = mz_repr::RowArena::new();
369377
let mut datums_local = datum_vec.borrow_with(&row);
370378
for result in mfp.evaluate(

src/storage/src/render/sinks.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
5656
timely::progress::Antichain::new(),
5757
None,
5858
None,
59-
// Copy the logic in DeltaJoin/Get/Join to start.
60-
|_timer, count| count > 1_000_000,
6159
);
6260
tokens.push(source_token);
6361

src/storage/src/render/sources.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,6 @@ where
360360
Antichain::new(),
361361
None,
362362
None,
363-
// Copy the logic in DeltaJoin/Get/Join to start.
364-
|_timer, count| count > 1_000_000,
365363
);
366364
let (tx_source_ok, tx_source_err) = (
367365
tx_source_ok_stream.as_collection(),
@@ -483,8 +481,6 @@ where
483481
Antichain::new(),
484482
None,
485483
flow_control,
486-
// Copy the logic in DeltaJoin/Get/Join to start.
487-
|_timer, count| count > 1_000_000,
488484
);
489485
(
490486
stream.as_collection(),

0 commit comments

Comments
 (0)