Skip to content

Commit d2b125d

Browse files
Merge pull request #701 from frankmcsherry/vec_merger_patch
Introduce `VecMerger` to efficiently merge owning vectors
2 parents 47c4ec1 + 7dd6a65 commit d2b125d

1 file changed

Lines changed: 169 additions & 22 deletions

File tree

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 169 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,160 @@ pub mod container {
278278
);
279279
}
280280

281-
/// A `Merger` using internal iteration for `Vec` containers.
282-
pub type VecInternalMerger<D, T, R> = InternalMerger<Vec<(D, T, R)>>;
281+
/// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
282+
pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
283283
/// A `Merger` using internal iteration for `TimelyStack` containers.
284284
pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
285285

286+
/// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
287+
pub struct VecMerger<D, T, R> {
288+
_marker: PhantomData<(D, T, R)>,
289+
}
290+
291+
impl<D, T, R> Default for VecMerger<D, T, R> {
292+
fn default() -> Self { Self { _marker: PhantomData } }
293+
}
294+
295+
impl<D, T, R> VecMerger<D, T, R> {
296+
/// The target capacity for output buffers, as a power of two.
297+
///
298+
/// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
299+
/// If this is mis-set, there is the potential for more memory churn than anticipated.
300+
fn target_capacity() -> usize {
301+
timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
302+
}
303+
/// Acquire a buffer with the target capacity.
304+
fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
305+
let target = Self::target_capacity();
306+
let mut container = stash.pop().unwrap_or_default();
307+
container.clear();
308+
// Reuse if at target; otherwise allocate fresh.
309+
if container.capacity() != target {
310+
container = Vec::with_capacity(target);
311+
}
312+
container
313+
}
314+
/// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
315+
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
316+
if queue.is_empty() {
317+
let target = Self::target_capacity();
318+
if stash.len() < 2 {
319+
let mut recycled = Vec::from(std::mem::take(queue));
320+
recycled.clear();
321+
if recycled.capacity() == target {
322+
stash.push(recycled);
323+
}
324+
}
325+
if let Some(chunk) = iter.next() {
326+
*queue = std::collections::VecDeque::from(chunk);
327+
}
328+
}
329+
}
330+
}
331+
332+
impl<D, T, R> Merger for VecMerger<D, T, R>
333+
where
334+
D: Ord + Clone + 'static,
335+
T: Ord + Clone + PartialOrder + 'static,
336+
R: crate::difference::Semigroup + Clone + 'static,
337+
{
338+
type Chunk = Vec<(D, T, R)>;
339+
type Time = T;
340+
341+
fn merge(
342+
&mut self,
343+
list1: Vec<Vec<(D, T, R)>>,
344+
list2: Vec<Vec<(D, T, R)>>,
345+
output: &mut Vec<Vec<(D, T, R)>>,
346+
stash: &mut Vec<Vec<(D, T, R)>>,
347+
) {
348+
use std::cmp::Ordering;
349+
use std::collections::VecDeque;
350+
351+
let mut iter1 = list1.into_iter();
352+
let mut iter2 = list2.into_iter();
353+
let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
354+
let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
355+
356+
let mut result = self.empty(stash);
357+
358+
// Merge while both queues are non-empty.
359+
while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
360+
match (d1, t1).cmp(&(d2, t2)) {
361+
Ordering::Less => {
362+
result.push(q1.pop_front().unwrap());
363+
}
364+
Ordering::Greater => {
365+
result.push(q2.pop_front().unwrap());
366+
}
367+
Ordering::Equal => {
368+
let (d, t, mut r1) = q1.pop_front().unwrap();
369+
let (_, _, r2) = q2.pop_front().unwrap();
370+
r1.plus_equals(&r2);
371+
if !r1.is_zero() {
372+
result.push((d, t, r1));
373+
}
374+
}
375+
}
376+
377+
if result.at_capacity() {
378+
output.push(std::mem::take(&mut result));
379+
result = self.empty(stash);
380+
}
381+
382+
// Refill emptied queues from their chains.
383+
if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
384+
if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
385+
}
386+
387+
// Push partial result and remaining data from both sides.
388+
if !result.is_empty() { output.push(result); }
389+
for q in [q1, q2] {
390+
if !q.is_empty() { output.push(Vec::from(q)); }
391+
}
392+
output.extend(iter1);
393+
output.extend(iter2);
394+
}
395+
396+
fn extract(
397+
&mut self,
398+
merged: Vec<Vec<(D, T, R)>>,
399+
upper: AntichainRef<T>,
400+
frontier: &mut Antichain<T>,
401+
ship: &mut Vec<Vec<(D, T, R)>>,
402+
kept: &mut Vec<Vec<(D, T, R)>>,
403+
stash: &mut Vec<Vec<(D, T, R)>>,
404+
) {
405+
let mut keep = self.empty(stash);
406+
let mut ready = self.empty(stash);
407+
408+
for chunk in merged {
409+
for (data, time, diff) in chunk {
410+
if upper.less_equal(&time) {
411+
frontier.insert_with(&time, |time| time.clone());
412+
keep.push((data, time, diff));
413+
} else {
414+
ready.push((data, time, diff));
415+
}
416+
}
417+
if keep.at_capacity() {
418+
kept.push(std::mem::take(&mut keep));
419+
keep = self.empty(stash);
420+
}
421+
if ready.at_capacity() {
422+
ship.push(std::mem::take(&mut ready));
423+
ready = self.empty(stash);
424+
}
425+
}
426+
if !keep.is_empty() { kept.push(keep); }
427+
if !ready.is_empty() { ship.push(ready); }
428+
}
429+
430+
fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
431+
(chunk.len(), 0, 0, 0)
432+
}
433+
}
434+
286435
/// A merger that uses internal iteration via [`InternalMerge`].
287436
pub struct InternalMerger<MC> {
288437
_marker: PhantomData<MC>,
@@ -307,6 +456,9 @@ pub mod container {
307456
stash.push(chunk);
308457
}
309458
/// Drain remaining items from one side into `result`/`output`.
459+
///
460+
/// Copies the partially-consumed head into `result`, then appends
461+
/// remaining full chunks directly to `output` without copying.
310462
fn drain_side(
311463
&self,
312464
head: &mut MC,
@@ -316,21 +468,20 @@ pub mod container {
316468
output: &mut Vec<MC>,
317469
stash: &mut Vec<MC>,
318470
) {
319-
while *pos < head.len() {
471+
// Copy the partially-consumed head into result.
472+
if *pos < head.len() {
320473
result.merge_from(
321474
std::slice::from_mut(head),
322475
std::slice::from_mut(pos),
323476
);
324-
if *pos >= head.len() {
325-
let old = std::mem::replace(head, list.next().unwrap_or_default());
326-
self.recycle(old, stash);
327-
*pos = 0;
328-
}
329-
if result.at_capacity() {
330-
output.push(std::mem::take(result));
331-
*result = self.empty(stash);
332-
}
333477
}
478+
// Flush result before appending full chunks.
479+
if !result.is_empty() {
480+
output.push(std::mem::take(result));
481+
*result = self.empty(stash);
482+
}
483+
// Remaining full chunks go directly to output.
484+
output.extend(list);
334485
}
335486
}
336487

@@ -370,20 +521,12 @@ pub mod container {
370521
}
371522
}
372523

373-
// Drain remaining from side 0.
524+
// Drain remaining from each side: copy partial head, then append full chunks.
374525
self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
375-
if !result.is_empty() {
376-
output.push(std::mem::take(&mut result));
377-
result = self.empty(stash);
378-
}
379-
output.extend(list1);
380-
381-
// Drain remaining from side 1.
382526
self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
383527
if !result.is_empty() {
384-
output.push(std::mem::take(&mut result));
528+
output.push(result);
385529
}
386-
output.extend(list2);
387530
}
388531

389532
fn extract(
@@ -424,6 +567,10 @@ pub mod container {
424567
}
425568

426569
/// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
570+
///
571+
/// Note: The `VecMerger` type implements `Merger` directly and avoids
572+
/// cloning by draining inputs. This `InternalMerge` impl is retained
573+
/// because `reduce` requires `Builder::Input: InternalMerge`.
427574
pub mod vec_internal {
428575
use std::cmp::Ordering;
429576
use timely::PartialOrder;

0 commit comments

Comments
 (0)