Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 116 additions & 49 deletions vortex-cuda/src/kernel/patches/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

use vortex::buffer::Buffer;
use vortex::buffer::BufferMut;
use vortex::buffer::buffer_mut;
use vortex_array::Canonical;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::IntegerPType;
Expand Down Expand Up @@ -40,19 +39,6 @@ const fn patch_lanes<V: Sized>() -> usize {
if size_of::<V>() < 8 { 32 } else { 16 }
}

#[derive(Clone)]
struct Chunk<V> {
lanes: Vec<Lane<V>>,
}

impl<V: Copy + Default> Default for Chunk<V> {
fn default() -> Self {
Self {
lanes: vec![Lane::<V>::default(); patch_lanes::<V>()],
}
}
}

/// A set of patches of values `V` existing in host buffers.
#[allow(dead_code)]
pub struct HostPatches<V> {
Expand Down Expand Up @@ -96,6 +82,20 @@ impl<V: Copy> HostPatches<V> {
}
}

/// Apply the patches on top of the other buffer.
#[cfg(test)]
fn apply(&self, output: &mut BufferMut<V>) {
for chunk in 0..self.n_chunks {
for lane in 0..self.n_lanes {
let patches = self.patches(chunk, lane);
for (&index, &value) in std::iter::zip(patches.indices, patches.values) {
let full_index = chunk * 1024 + (index as usize);
output[full_index] = value;
}
}
}
}

/// Export the patches for use on the device associated with the provided execution context.
pub async fn export_to_device(
mut self,
Expand All @@ -122,23 +122,6 @@ impl<V: Copy> HostPatches<V> {
}
}

#[derive(Debug, Default, Clone)]
struct Lane<V> {
indices: Vec<u16>,
values: Vec<V>,
}

impl<V: Copy> Lane<V> {
fn push(&mut self, index: u16, value: V) {
self.indices.push(index);
self.values.push(value);
}

fn len(&self) -> usize {
self.indices.len()
}
}

/// Transpose a set of patches from the default sorted layout into the data parallel layout.
#[allow(clippy::cognitive_complexity)]
pub async fn transpose_patches(
Expand Down Expand Up @@ -180,8 +163,8 @@ pub async fn transpose_patches(

#[allow(clippy::cast_possible_truncation)]
fn transpose<I: IntegerPType, V: NativePType>(
indices: &[I],
values: &[V],
indices_in: &[I],
values_in: &[V],
offset: usize,
array_len: usize,
) -> HostPatches<V> {
Expand All @@ -193,30 +176,56 @@ fn transpose<I: IntegerPType, V: NativePType>(
);

let n_lanes = patch_lanes::<V>();
let mut chunks: Vec<Chunk<V>> = vec![Chunk::default(); n_chunks];

// For each chunk, for each lane, push new values
for (index, &value) in std::iter::zip(indices, values) {
// We know upfront how many indices and values we'll have.
let mut indices_buffer = BufferMut::with_capacity(indices_in.len());
let mut values_buffer = BufferMut::with_capacity(values_in.len());

// number of patches in each chunk.
let mut lane_offsets: BufferMut<u32> = BufferMut::zeroed(n_chunks * n_lanes + 1);

// Scan the index/values once to get chunk/lane counts
for index in indices_in {
let index = index.as_() - offset;
let chunk = index / 1024;
let lane = index % n_lanes;

lane_offsets[chunk * n_lanes + lane + 1] += 1;
}

// Prefix-sum sizes -> offsets
for index in 1..lane_offsets.len() {
lane_offsets[index] += lane_offsets[index - 1];
}

// Loop over patches, writing them to final positions
let indices_out = indices_buffer.spare_capacity_mut();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a broader test coverage for transpose?

  • with non-zero offset
  • different value types, in particular 64bit as that changes the lane count to 16
  • array len % 1024 != 0

let values_out = values_buffer.spare_capacity_mut();
for (index, &value) in std::iter::zip(indices_in, values_in) {
let index = index.as_() - offset;
let chunk = index / 1024;
let lane = index % n_lanes;

chunks[chunk].lanes[lane].push((index % 1024) as u16, value);
let position = &mut lane_offsets[chunk * n_lanes + lane];
indices_out[*position as usize].write((index % 1024) as u16);
values_out[*position as usize].write(value);
*position += 1;
}

// Reshuffle the different containers into a single contiguous buffer each for indices/values
let mut lane_offset = 0;
let mut lane_offsets = buffer_mut![0u32];
let mut indices_buffer = BufferMut::empty();
let mut values_buffer = BufferMut::empty();
for chunk in chunks {
for lane in chunk.lanes {
indices_buffer.extend_from_slice(&lane.indices);
values_buffer.extend_from_slice(&lane.values);
lane_offset += lane.len() as u32;
lane_offsets.push(lane_offset);
}
// SAFETY: we know there are exactly indices_in.len() indices/values, and we just
// set them to the appropriate values in the loop above.
unsafe {
indices_buffer.set_len(indices_in.len());
values_buffer.set_len(values_in.len());
}

// Now, pass over all the indices and values again and subtract out the position increments.
for index in indices_in {
let index = index.as_() - offset;
let chunk = index / 1024;
let lane = index % n_lanes;

lane_offsets[chunk * n_lanes + lane] -= 1;
}

HostPatches {
Expand All @@ -232,6 +241,15 @@ fn transpose<I: IntegerPType, V: NativePType>(
mod tests {
use vortex::buffer::BufferMut;
use vortex::buffer::buffer;
use vortex::buffer::buffer_mut;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::NativePType;
use vortex_array::patches::Patches;
use vortex_error::VortexResult;

use crate::kernel::patches::types::transpose;

Expand Down Expand Up @@ -285,4 +303,53 @@ mod tests {
assert_eq!(transposed.patches(3, 4).values, &[80]);
assert_eq!(transposed.patches(3, 4).indices, &[4]);
}

#[test]
#[allow(clippy::cast_possible_truncation)]
fn test_transpose_complex() -> VortexResult<()> {
test_case(1024, 0, &[0], &[0f32])?;
test_case(512, 512, &[512, 513, 514], &[10i8, 20, 30])?;
test_case(10_000, 100, &[500, 1_000, 1_001, 1_002], &[1i16, 2, 3, 4])?;

for len in (1..4096).step_by(10) {
let offset = len / 2;

let indices: Vec<u32> = (offset..len).map(|x| x as u32).collect();

test_case(len, offset, &indices, &indices)?;
}

Ok(())
}

fn test_case<V: NativePType>(
len: usize,
offset: usize,
patch_indices: &[u32],
patch_values: &[V],
) -> VortexResult<()> {
let mut data = buffer_mut![V::default(); len];
let array = PrimitiveArray::from_iter(data.iter().copied());

let patches = Patches::new(
len,
offset,
PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array(),
PrimitiveArray::from_iter(patch_values.iter().copied()).into_array(),
None,
)?;

// Verify that the outputs match between Patches and transpose_patches().
let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone());
let patched = array.patch(&patches, &mut ctx)?.into_array();

let transposed = transpose(patch_indices, patch_values, offset, len);
transposed.apply(&mut data);

let patched_transposed = data.freeze().into_array();

assert_arrays_eq!(patched, patched_transposed);

Ok(())
}
}
Loading