diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index fa9c5175a85..fc73045c63e 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -256,6 +256,8 @@ impl core::fmt::Debug for vortex_file::SegmentSpec pub fn vortex_file::SegmentSpec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::marker::Copy for vortex_file::SegmentSpec + pub struct vortex_file::VortexFile impl vortex_file::VortexFile diff --git a/vortex-file/src/footer/segment.rs b/vortex-file/src/footer/segment.rs index 7872116404b..8dcb7d1802b 100644 --- a/vortex-file/src/footer/segment.rs +++ b/vortex-file/src/footer/segment.rs @@ -11,7 +11,7 @@ use vortex_flatbuffers::footer as fb; /// /// A segment is a contiguous block of bytes in a file that contains a part of the file's data. /// The `SegmentSpec` struct specifies the location and properties of a segment. -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct SegmentSpec { /// The byte offset of the segment from the start of the file. pub offset: u64, diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 8ed8c6645ec..ac8fdd83097 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -32,6 +32,7 @@ use crate::EOF_SIZE; use crate::MAX_POSTSCRIPT_SIZE; use crate::VortexFile; use crate::footer::Footer; +use crate::segments::BufferSegmentSource; use crate::segments::FileSegmentSource; use crate::segments::InitialReadSegmentCache; use crate::segments::RequestMetrics; @@ -161,9 +162,36 @@ impl VortexOpenOptions { } /// Open a Vortex file from an in-memory buffer. + /// + /// This uses a `BufferSegmentSource` that resolves segments synchronously + /// by slicing the buffer directly, bypassing the async I/O pipeline. pub fn open_buffer>(self, buffer: B) -> VortexResult { - // We know this is in memory, so we can open it synchronously. - block_on(self.with_initial_read_size(0).open_read(buffer.into())) + let buffer: ByteBuffer = buffer.into(); + + if self.segment_cache.is_some() { + tracing::warn!("segment cache is ignored for in-memory `open_buffer`"); + } + if self.metrics_registry.is_some() { + tracing::warn!("metrics registry is ignored for in-memory `open_buffer`"); + } + + let mut opts = self.with_initial_read_size(0); + + let footer = match opts.footer.take() { + Some(footer) => footer, + None => block_on(opts.read_footer(&buffer))?, + }; + + let segment_source = Arc::new(BufferSegmentSource::new( + buffer, + footer.segment_map().clone(), + )); + + Ok(VortexFile { + footer, + segment_source, + session: opts.session, + }) } /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation. diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index b981e1d5afa..c27d9e2f944 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -14,6 +14,7 @@ use futures::channel::mpsc; use futures::future; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; +use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_error::vortex_panic; @@ -136,7 +137,7 @@ impl SegmentSource for FileSegmentSource { fn request(&self, id: SegmentId) -> SegmentFuture { // We eagerly register the read request here assuming the behaviour of [`FileRead`], where // coalescing becomes effective prior to the future being polled. - let spec = match self.segments.get(*id as usize).cloned() { + let spec = *match self.segments.get(*id as usize) { Some(spec) => spec, None => { return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed(); @@ -251,3 +252,46 @@ impl RequestMetrics { } } } + +/// A [`SegmentSource`] that resolves segments synchronously from an +/// in-memory [`ByteBuffer`]. +/// +/// Resolves segments synchronously, bypassing the async I/O pipeline. +pub(crate) struct BufferSegmentSource { + buffer: ByteBuffer, + segments: Arc<[SegmentSpec]>, +} + +impl BufferSegmentSource { + /// Create a new `BufferSegmentSource` from a buffer and its segment map. + pub fn new(buffer: ByteBuffer, segments: Arc<[SegmentSpec]>) -> Self { + Self { buffer, segments } + } +} + +impl SegmentSource for BufferSegmentSource { + fn request(&self, id: SegmentId) -> SegmentFuture { + let spec = match self.segments.get(*id as usize) { + Some(spec) => spec, + None => { + return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed(); + } + }; + + let start = spec.offset as usize; + let end = start + spec.length as usize; + if end > self.buffer.len() { + return future::ready(Err(vortex_err!( + "Segment {} range {}..{} out of bounds for buffer of length {}", + *id, + start, + end, + self.buffer.len() + ))) + .boxed(); + } + + let slice = self.buffer.slice(start..end).aligned(spec.alignment); + future::ready(Ok(BufferHandle::new_host(slice))).boxed() + } +} diff --git a/vortex-tui/src/segment_tree.rs b/vortex-tui/src/segment_tree.rs index ba8d3f16a6e..fd88650f2fd 100644 --- a/vortex-tui/src/segment_tree.rs +++ b/vortex-tui/src/segment_tree.rs @@ -128,7 +128,7 @@ fn segments_by_name_impl( .or_default(); for segment_id in root.segment_ids() { - let segment_spec = segments[*segment_id as usize].clone(); + let segment_spec = segments[*segment_id as usize]; current_segments.push(SegmentDisplay { name: name.clone().unwrap_or_else(|| "".into()), spec: segment_spec,