Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vortex-file/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ impl core::fmt::Debug for vortex_file::SegmentSpec

pub fn vortex_file::SegmentSpec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::marker::Copy for vortex_file::SegmentSpec

pub struct vortex_file::VortexFile

impl vortex_file::VortexFile
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/footer/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vortex_flatbuffers::footer as fb;
///
/// A segment is a contiguous block of bytes in a file that contains a part of the file's data.
/// The `SegmentSpec` struct specifies the location and properties of a segment.
#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct SegmentSpec {
/// The byte offset of the segment from the start of the file.
pub offset: u64,
Expand Down
32 changes: 30 additions & 2 deletions vortex-file/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::EOF_SIZE;
use crate::MAX_POSTSCRIPT_SIZE;
use crate::VortexFile;
use crate::footer::Footer;
use crate::segments::BufferSegmentSource;
use crate::segments::FileSegmentSource;
use crate::segments::InitialReadSegmentCache;
use crate::segments::RequestMetrics;
Expand Down Expand Up @@ -161,9 +162,36 @@ impl VortexOpenOptions {
}

/// Open a Vortex file from an in-memory buffer.
///
/// This uses a `BufferSegmentSource` that resolves segments synchronously
/// by slicing the buffer directly, bypassing the async I/O pipeline.
pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
// We know this is in memory, so we can open it synchronously.
block_on(self.with_initial_read_size(0).open_read(buffer.into()))
let buffer: ByteBuffer = buffer.into();

if self.segment_cache.is_some() {
tracing::warn!("segment cache is ignored for in-memory `open_buffer`");
}
if self.metrics_registry.is_some() {
tracing::warn!("metrics registry is ignored for in-memory `open_buffer`");
}

let mut opts = self.with_initial_read_size(0);

let footer = match opts.footer.take() {
Some(footer) => footer,
None => block_on(opts.read_footer(&buffer))?,
};

let segment_source = Arc::new(BufferSegmentSource::new(
buffer,
footer.segment_map().clone(),
));

Ok(VortexFile {
footer,
segment_source,
session: opts.session,
})
}

/// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
Expand Down
46 changes: 45 additions & 1 deletion vortex-file/src/segments/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use futures::channel::mpsc;
use futures::future;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
Expand Down Expand Up @@ -136,7 +137,7 @@ impl SegmentSource for FileSegmentSource {
fn request(&self, id: SegmentId) -> SegmentFuture {
// We eagerly register the read request here assuming the behaviour of [`FileRead`], where
// coalescing becomes effective prior to the future being polled.
let spec = match self.segments.get(*id as usize).cloned() {
let spec = *match self.segments.get(*id as usize) {
Some(spec) => spec,
None => {
return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
Expand Down Expand Up @@ -251,3 +252,46 @@ impl RequestMetrics {
}
}
}

/// A [`SegmentSource`] that resolves segments synchronously from an
/// in-memory [`ByteBuffer`].
///
/// Resolves segments synchronously, bypassing the async I/O pipeline.
pub(crate) struct BufferSegmentSource {
buffer: ByteBuffer,
segments: Arc<[SegmentSpec]>,
}

impl BufferSegmentSource {
/// Create a new `BufferSegmentSource` from a buffer and its segment map.
pub fn new(buffer: ByteBuffer, segments: Arc<[SegmentSpec]>) -> Self {
Self { buffer, segments }
}
}

impl SegmentSource for BufferSegmentSource {
fn request(&self, id: SegmentId) -> SegmentFuture {
let spec = match self.segments.get(*id as usize) {
Some(spec) => spec,
None => {
return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
}
};

let start = spec.offset as usize;
let end = start + spec.length as usize;
if end > self.buffer.len() {
return future::ready(Err(vortex_err!(
"Segment {} range {}..{} out of bounds for buffer of length {}",
*id,
start,
end,
self.buffer.len()
)))
.boxed();
}

let slice = self.buffer.slice(start..end).aligned(spec.alignment);
future::ready(Ok(BufferHandle::new_host(slice))).boxed()
}
}
2 changes: 1 addition & 1 deletion vortex-tui/src/segment_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn segments_by_name_impl(
.or_default();

for segment_id in root.segment_ids() {
let segment_spec = segments[*segment_id as usize].clone();
let segment_spec = segments[*segment_id as usize];
current_segments.push(SegmentDisplay {
name: name.clone().unwrap_or_else(|| "<unnamed>".into()),
spec: segment_spec,
Expand Down
Loading