Skip to content

Commit 09d0b87

Browse files
Demonstrate lazy scheduling (#754)
* Demonstrate lazy scheduling * Add FrontierInterest with IfCapability variant * Move FrontierInterest to be per-input * Remove default notify_me impl; streamline setting the value
1 parent 9319cab commit 09d0b87

File tree

14 files changed

+97
-49
lines changed

14 files changed

+97
-49
lines changed

timely/examples/event_driven.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use timely::dataflow::operators::{Input, Probe};
1+
use timely::dataflow::Scope;
2+
use timely::dataflow::operators::{Input, Probe, Enter, Leave};
23
use timely::dataflow::operators::vec::Map;
34

45
fn main() {
@@ -20,10 +21,14 @@ fn main() {
2021
// create a new input, exchange data, and inspect its output
2122
for _dataflow in 0 .. dataflows {
2223
worker.dataflow(|scope| {
23-
let (input, mut stream) = scope.new_input();
24-
for _step in 0 .. length {
25-
stream = stream.map(|x: ()| x);
26-
}
24+
let (input, stream) = scope.new_input();
25+
let stream = scope.region(|inner| {
26+
let mut stream = stream.enter(inner);
27+
for _step in 0 .. length {
28+
stream = stream.map(|x: ()| x);
29+
}
30+
stream.leave()
31+
});
2732
let (probe, _stream) = stream.probe();
2833
inputs.push(input);
2934
probes.push(probe);
@@ -43,7 +48,7 @@ fn main() {
4348
worker.step();
4449
steps += 1;
4550
}
46-
println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
51+
if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
4752
}
4853

4954
}).unwrap();

timely/src/dataflow/operators/core/concat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ impl<G: Scope, C: Container> Concatenate<G, C> for G {
6363
// create an operator builder.
6464
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
6565
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
66-
builder.set_notify(false);
6766

6867
// create new input handles for each input stream.
6968
let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
69+
for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); }
7070

7171
// create one output handle for the concatenated results.
7272
let (mut output, result) = builder.new_output();

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ impl<G: Scope> Feedback<G> for G {
7373
fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, Stream<G, C>) {
7474

7575
let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
76-
builder.set_notify(false);
7776
let (output, stream) = builder.new_output();
7877

7978
(Handle { builder, summary, output }, stream)
@@ -118,6 +117,7 @@ impl<G: Scope, C: Container> ConnectLoop<G, C> for Stream<G, C> {
118117
let mut output = handle.output;
119118

120119
let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
120+
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
121121

122122
builder.build(move |_capability| move |_frontier| {
123123
let mut output = output.activate();

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl<T:Timestamp> Schedule for Operator<T> {
199199
}
200200
}
201201

202+
use crate::progress::operate::FrontierInterest;
202203
impl<T:Timestamp> Operate<T> for Operator<T> {
203204

204205
fn inputs(&self) -> usize { 0 }
@@ -209,7 +210,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
209210
(Vec::new(), Rc::clone(&self.shared_progress), self)
210211
}
211212

212-
fn notify_me(&self) -> bool { false }
213+
fn notify_me(&self) -> &[FrontierInterest] { &[] }
213214
}
214215

215216

timely/src/dataflow/operators/core/ok_err.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C> {
5252
L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
5353
{
5454
let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
55-
builder.set_notify(false);
5655

5756
let mut input = builder.new_input(self, Pipeline);
57+
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
5858
let (output1, stream1) = builder.new_output();
5959
let (output2, stream2) = builder.new_output();
6060

timely/src/dataflow/operators/core/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for Stream<G, C> {
4141
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
4242
{
4343
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
44-
builder.set_notify(false);
4544

4645
let mut input = builder.new_input(self, Pipeline);
46+
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
4747
let mut outputs = Vec::with_capacity(parts as usize);
4848
let mut streams = Vec::with_capacity(parts as usize);
4949

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ impl<T:Timestamp> Schedule for UnorderedOperator<T> {
129129
}
130130
}
131131

132+
use crate::progress::operate::FrontierInterest;
132133
impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
133134
fn inputs(&self) -> usize { 0 }
134135
fn outputs(&self) -> usize { 1 }
@@ -140,7 +141,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
140141
(Vec::new(), Rc::clone(&self.shared_progress), self)
141142
}
142143

143-
fn notify_me(&self) -> bool { false }
144+
fn notify_me(&self) -> &[FrontierInterest] { &[] }
144145
}
145146

146147
/// A handle to an input [Stream], used to introduce data to a timely dataflow computation.

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15-
use crate::progress::operate::{Connectivity, PortConnectivity};
15+
use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
1616
use crate::Container;
1717
use crate::dataflow::{Stream, Scope};
1818
use crate::dataflow::channels::pushers::Tee;
@@ -23,7 +23,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2323
#[derive(Debug)]
2424
pub struct OperatorShape {
2525
name: String, // A meaningful name for the operator.
26-
notify: bool, // Does the operator require progress notifications.
26+
notify: Vec<FrontierInterest>, // Per-input frontier interest.
2727
peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude.
2828
inputs: usize, // The number of input ports.
2929
outputs: usize, // The number of output ports.
@@ -34,7 +34,7 @@ impl OperatorShape {
3434
fn new(name: String, peers: usize) -> Self {
3535
OperatorShape {
3636
name,
37-
notify: true,
37+
notify: Vec::new(),
3838
peers,
3939
inputs: 0,
4040
outputs: 0,
@@ -88,8 +88,10 @@ impl<G: Scope> OperatorBuilder<G> {
8888
/// Return a reference to the operator's shape
8989
pub fn shape(&self) -> &OperatorShape { &self.shape }
9090

91-
/// Indicates whether the operator requires frontier information.
92-
pub fn set_notify(&mut self, notify: bool) { self.shape.notify = notify; }
91+
/// Sets frontier interest for a specific input.
92+
pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
93+
self.shape.notify[input] = notify;
94+
}
9395

9496
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
9597
pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> P::Puller
@@ -113,6 +115,7 @@ impl<G: Scope> OperatorBuilder<G> {
113115
stream.connect_to(target, sender, channel_id);
114116

115117
self.shape.inputs += 1;
118+
self.shape.notify.push(FrontierInterest::Always);
116119
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
117120
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
118121
self.summary.push(connectivity);
@@ -220,5 +223,5 @@ where
220223
(self.summary.clone(), Rc::clone(&self.shared_progress), self)
221224
}
222225

223-
fn notify_me(&self) -> bool { self.shape.notify }
226+
fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
224227
}

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::dataflow::operators::capability::Capability;
1818
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
1919
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2020
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
21-
use crate::progress::operate::PortConnectivity;
21+
use crate::progress::operate::{FrontierInterest, PortConnectivity};
2222

2323
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
2424

@@ -48,8 +48,10 @@ impl<G: Scope> OperatorBuilder<G> {
4848
}
4949
}
5050

51-
/// Indicates whether the operator requires frontier information.
52-
pub fn set_notify(&mut self, notify: bool) { self.builder.set_notify(notify); }
51+
/// Sets frontier interest for a specific input.
52+
pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
53+
self.builder.set_notify_for(input, notify);
54+
}
5355

5456
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
5557
pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>

timely/src/dataflow/operators/generic/operator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,9 +373,9 @@ impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1> {
373373
let operator_info = builder.operator_info();
374374

375375
let mut input = builder.new_input(self, pact);
376+
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
376377
let (output, stream) = builder.new_output();
377378
let mut output = OutputBuilder::from(output);
378-
builder.set_notify(false);
379379

380380
builder.build(move |mut capabilities| {
381381
// `capabilities` should be a single-element vector.
@@ -461,9 +461,10 @@ impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1> {
461461

462462
let mut input1 = builder.new_input(self, pact1);
463463
let mut input2 = builder.new_input(other, pact2);
464+
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
465+
builder.set_notify_for(1, crate::progress::operate::FrontierInterest::Never);
464466
let (output, stream) = builder.new_output();
465467
let mut output = OutputBuilder::from(output);
466-
builder.set_notify(false);
467468

468469
builder.build(move |mut capabilities| {
469470
// `capabilities` should be a single-element vector.
@@ -547,7 +548,6 @@ where
547548

548549
let (output, stream) = builder.new_output();
549550
let mut output = OutputBuilder::from(output);
550-
builder.set_notify(false);
551551

552552
builder.build(move |mut capabilities| {
553553
// `capabilities` should be a single-element vector.

0 commit comments

Comments
 (0)