1111//!
1212//! Consult [TopKPlan] documentation for details.
1313
14+ use std:: collections:: HashMap ;
15+
1416use differential_dataflow:: hashable:: Hashable ;
1517use differential_dataflow:: lattice:: Lattice ;
1618use differential_dataflow:: operators:: arrange:: ArrangeBySelf ;
@@ -19,6 +21,8 @@ use differential_dataflow::operators::Consolidate;
1921use differential_dataflow:: trace:: implementations:: ord:: OrdValSpine ;
2022use differential_dataflow:: AsCollection ;
2123use differential_dataflow:: Collection ;
24+ use timely:: dataflow:: channels:: pact:: Pipeline ;
25+ use timely:: dataflow:: operators:: Operator ;
2226use timely:: dataflow:: Scope ;
2327
2428use mz_compute_client:: plan:: top_k:: {
5660 arity,
5761 limit,
5862 } ) => {
63+ // For monotonic inputs, we are able to thin the input relation in two stages:
64+ // 1. First, we can do an intra-timestamp thinning which has the advantage of
65+ // being computed in a streaming fashion, even for the initial snapshot.
66+ // 2. Then, we can do inter-timestamp thinning by feeding back negations for
67+ // any records that have been invalidated.
68+ let ok_input = if let Some ( limit) = limit {
69+ render_intra_ts_thinning ( ok_input, order_key. clone ( ) , limit)
70+ } else {
71+ ok_input
72+ } ;
73+
5974 // For monotonic inputs, we are able to retract inputs that can no longer be produced
6075 // as outputs. Any inputs beyond `offset + limit` will never again be produced as
6176 // outputs, and can be removed. The simplest form of this is when `offset == 0` and
6580 // of `offset` and `limit`, discarding only the records not produced in the intermediate
6681 // stage.
6782 use differential_dataflow:: operators:: iterate:: Variable ;
68- let delay = std:: time:: Duration :: from_nanos ( 10_000_000_000 ) ;
83+ let delay = std:: time:: Duration :: from_secs ( 10 ) ;
6984 let retractions = Variable :: new (
7085 & mut ok_input. scope ( ) ,
7186 <G :: Timestamp as crate :: render:: RenderTimestamp >:: system_delay (
@@ -317,6 +332,143 @@ where
317332 // TODO(#7331): Here we discard the arranged output.
318333 result. as_collection ( |_k, v| v. clone ( ) )
319334 }
335+
336+ fn render_intra_ts_thinning < G > (
337+ collection : Collection < G , Row , Diff > ,
338+ order_key : Vec < mz_expr:: ColumnOrder > ,
339+ limit : usize ,
340+ ) -> Collection < G , Row , Diff >
341+ where
342+ G : Scope ,
343+ G :: Timestamp : Lattice ,
344+ {
345+ let mut aggregates = HashMap :: new ( ) ;
346+ let mut vector = Vec :: new ( ) ;
347+ collection
348+ . inner
349+ . unary_notify (
350+ Pipeline ,
351+ "TopKIntraTimeThinning" ,
352+ [ ] ,
353+ move |input, output, notificator| {
354+ while let Some ( ( time, data) ) = input. next ( ) {
355+ data. swap ( & mut vector) ;
356+ let agg_time = aggregates
357+ . entry ( time. time ( ) . clone ( ) )
358+ . or_insert_with ( HashMap :: new) ;
359+ for ( row, record_time, diff) in vector. drain ( ..) {
360+ let monoid = monoids:: Top1Monoid {
361+ row,
362+ order_key : order_key. clone ( ) ,
363+ } ;
364+ let topk = agg_time. entry ( record_time) . or_insert_with ( move || {
365+ topk_agg:: TopKBatch :: new ( limit. try_into ( ) . expect ( "must fit" ) )
366+ } ) ;
367+ topk. update ( monoid, diff) ;
368+ }
369+ notificator. notify_at ( time. retain ( ) ) ;
370+ }
371+
372+ // pop completed aggregates, send along whatever
373+ notificator. for_each ( |time, _, _| {
374+ if let Some ( aggs) = aggregates. remove ( time. time ( ) ) {
375+ let mut session = output. session ( & time) ;
376+ for ( record_time, topk) in aggs {
377+ session. give_iterator ( topk. into_iter ( ) . map (
378+ |( monoid, diff) | ( monoid. row , record_time. clone ( ) , diff) ,
379+ ) )
380+ }
381+ }
382+ } ) ;
383+ } ,
384+ )
385+ . as_collection ( )
386+ }
387+ }
388+ }
389+
390+ /// Types for in-place intra-ts aggregation of monotonic streams.
391+ pub mod topk_agg {
392+ use smallvec:: SmallVec ;
393+
394+ pub struct TopKBatch < T > {
395+ updates : SmallVec < [ ( T , i64 ) ; 16 ] > ,
396+ clean : usize ,
397+ limit : i64 ,
398+ }
399+
400+ impl < T : Ord > TopKBatch < T > {
401+ pub fn new ( limit : i64 ) -> Self {
402+ Self {
403+ updates : SmallVec :: new ( ) ,
404+ clean : 0 ,
405+ limit,
406+ }
407+ }
408+
409+ /// Adds a new update, for `item` with `value`.
410+ ///
411+ /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
412+ /// half the length of the list, which would keep the total footprint within reasonable bounds
413+ /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
414+ /// is worth paying without some experimentation.
415+ #[ inline]
416+ pub fn update ( & mut self , item : T , value : i64 ) {
417+ self . updates . push ( ( item, value) ) ;
418+ self . maintain_bounds ( ) ;
419+ }
420+
421+ /// Compact the internal representation.
422+ ///
423+ /// This method sort `self.updates` and consolidates elements with equal item, discarding
424+ /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
425+ /// elements is non-zero.
426+ #[ inline]
427+ pub fn compact ( & mut self ) {
428+ if self . clean < self . updates . len ( ) && self . updates . len ( ) > 1 {
429+ self . updates . sort_by ( |x, y| x. 0 . cmp ( & y. 0 ) ) ;
430+ for i in 0 ..self . updates . len ( ) - 1 {
431+ if self . updates [ i] . 0 == self . updates [ i + 1 ] . 0 {
432+ self . updates [ i + 1 ] . 1 += self . updates [ i] . 1 ;
433+ self . updates [ i] . 1 = 0 ;
434+ }
435+ }
436+ let mut limit = self . limit ;
437+ self . updates . retain ( |x| {
438+ if limit > 0 {
439+ limit -= x. 1 ;
440+ true
441+ } else {
442+ false
443+ }
444+ } ) ;
445+ // Adjust the diff of the last record that was retained so that we have exactly K
446+ // records
447+ if let Some ( item) = self . updates . last_mut ( ) {
448+ item. 1 -= -limit;
449+ }
450+ }
451+ self . clean = self . updates . len ( ) ;
452+ }
453+
454+ /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
455+ /// This function tries to minimize work by only compacting if enough work has accumulated.
456+ fn maintain_bounds ( & mut self ) {
457+ // if we have more than 32 elements and at least half of them are not clean, compact
458+ if self . updates . len ( ) > 32 && self . updates . len ( ) >> 1 >= self . clean {
459+ self . compact ( )
460+ }
461+ }
462+ }
463+
464+ impl < T : Ord > IntoIterator for TopKBatch < T > {
465+ type Item = ( T , i64 ) ;
466+ type IntoIter = smallvec:: IntoIter < [ ( T , i64 ) ; 16 ] > ;
467+
468+ fn into_iter ( mut self ) -> Self :: IntoIter {
469+ self . compact ( ) ;
470+ self . updates . into_iter ( )
471+ }
320472 }
321473}
322474
0 commit comments