77
88use vortex:: buffer:: Buffer ;
99use vortex:: buffer:: BufferMut ;
10- use vortex:: buffer:: buffer_mut;
1110use vortex_array:: Canonical ;
1211use vortex_array:: buffer:: BufferHandle ;
1312use vortex_array:: dtype:: IntegerPType ;
@@ -40,19 +39,6 @@ const fn patch_lanes<V: Sized>() -> usize {
4039 if size_of :: < V > ( ) < 8 { 32 } else { 16 }
4140}
4241
43- #[ derive( Clone ) ]
44- struct Chunk < V > {
45- lanes : Vec < Lane < V > > ,
46- }
47-
48- impl < V : Copy + Default > Default for Chunk < V > {
49- fn default ( ) -> Self {
50- Self {
51- lanes : vec ! [ Lane :: <V >:: default ( ) ; patch_lanes:: <V >( ) ] ,
52- }
53- }
54- }
55-
5642/// A set of patches of values `V` existing in host buffers.
5743#[ allow( dead_code) ]
5844pub struct HostPatches < V > {
@@ -122,23 +108,6 @@ impl<V: Copy> HostPatches<V> {
122108 }
123109}
124110
125- #[ derive( Debug , Default , Clone ) ]
126- struct Lane < V > {
127- indices : Vec < u16 > ,
128- values : Vec < V > ,
129- }
130-
131- impl < V : Copy > Lane < V > {
132- fn push ( & mut self , index : u16 , value : V ) {
133- self . indices . push ( index) ;
134- self . values . push ( value) ;
135- }
136-
137- fn len ( & self ) -> usize {
138- self . indices . len ( )
139- }
140- }
141-
142111/// Transpose a set of patches from the default sorted layout into the data parallel layout.
143112#[ allow( clippy:: cognitive_complexity) ]
144113pub async fn transpose_patches (
@@ -180,8 +149,8 @@ pub async fn transpose_patches(
180149
181150#[ allow( clippy:: cast_possible_truncation) ]
182151fn transpose < I : IntegerPType , V : NativePType > (
183- indices : & [ I ] ,
184- values : & [ V ] ,
152+ indices_in : & [ I ] ,
153+ values_in : & [ V ] ,
185154 offset : usize ,
186155 array_len : usize ,
187156) -> HostPatches < V > {
@@ -193,30 +162,56 @@ fn transpose<I: IntegerPType, V: NativePType>(
193162 ) ;
194163
195164 let n_lanes = patch_lanes :: < V > ( ) ;
196- let mut chunks: Vec < Chunk < V > > = vec ! [ Chunk :: default ( ) ; n_chunks] ;
197165
198- // For each chunk, for each lane, push new values
199- for ( index, & value) in std:: iter:: zip ( indices, values) {
166+ // We know upfront how many indices and values we'll have.
167+ let mut indices_buffer = BufferMut :: with_capacity ( indices_in. len ( ) ) ;
168+ let mut values_buffer = BufferMut :: with_capacity ( values_in. len ( ) ) ;
169+
170+ // number of patches in each chunk.
171+ let mut lane_offsets: BufferMut < u32 > = BufferMut :: zeroed ( n_chunks * n_lanes + 1 ) ;
172+
173+ // Scan the index/values once to get chunk/lane counts
174+ for index in indices_in {
200175 let index = index. as_ ( ) - offset;
176+ let chunk = index / 1024 ;
177+ let lane = index % n_lanes;
178+
179+ lane_offsets[ chunk * n_lanes + lane + 1 ] += 1 ;
180+ }
201181
182+ // Prefix-sum sizes -> offsets
183+ for index in 1 ..lane_offsets. len ( ) {
184+ lane_offsets[ index] += lane_offsets[ index - 1 ] ;
185+ }
186+
187+ // Loop over patches, writing them to final positions
188+ let indices_out = indices_buffer. spare_capacity_mut ( ) ;
189+ let values_out = values_buffer. spare_capacity_mut ( ) ;
190+ for ( index, & value) in std:: iter:: zip ( indices_in, values_in) {
191+ let index = index. as_ ( ) - offset;
202192 let chunk = index / 1024 ;
203193 let lane = index % n_lanes;
204194
205- chunks[ chunk] . lanes [ lane] . push ( ( index % 1024 ) as u16 , value) ;
195+ let position = & mut lane_offsets[ chunk * n_lanes + lane] ;
196+ indices_out[ * position as usize ] . write ( ( index % 1024 ) as u16 ) ;
197+ values_out[ * position as usize ] . write ( value) ;
198+ * position += 1 ;
206199 }
207200
208- // Reshuffle the different containers into a single contiguous buffer each for indices/values
209- let mut lane_offset = 0 ;
210- let mut lane_offsets = buffer_mut ! [ 0u32 ] ;
211- let mut indices_buffer = BufferMut :: empty ( ) ;
212- let mut values_buffer = BufferMut :: empty ( ) ;
213- for chunk in chunks {
214- for lane in chunk. lanes {
215- indices_buffer. extend_from_slice ( & lane. indices ) ;
216- values_buffer. extend_from_slice ( & lane. values ) ;
217- lane_offset += lane. len ( ) as u32 ;
218- lane_offsets. push ( lane_offset) ;
219- }
201+ // SAFETY: we know there are exactly indices_in.len() indices/values, and we just
202+ // set them to the appropriate values in the loop above.
203+ unsafe {
204+ indices_buffer. set_len ( indices_in. len ( ) ) ;
205+ values_buffer. set_len ( values_in. len ( ) ) ;
206+ }
207+
208+ // Now, pass over all the indices and values again and subtract out the position increments.
209+ for index in indices_in {
210+ let index = index. as_ ( ) - offset;
211+ let chunk = index / 1024 ;
212+ let lane = index % n_lanes;
213+
214+ lane_offsets[ chunk * n_lanes + lane] -= 1 ;
220215 }
221216
222217 HostPatches {
0 commit comments