Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vortex-duckdb/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ fn main() {
.size_t_is_usize(true)
.clang_arg(format!("-I{}", duckdb_include_path.display()))
.clang_arg(format!("-I{}", crate_dir.join("cpp/include").display()))
.generate_comments(true)
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
Expand Down
33 changes: 26 additions & 7 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,41 @@ void duckdb_vx_string_map_free(duckdb_vx_string_map map);
// Input data passed into the init_global and init_local callbacks.
typedef struct {
const void *bind_data;
idx_t *column_ids;
size_t column_ids_count;
// uint64_t *column_indexes;
// size_t column_indexes_count;
const idx_t *projection_ids;

/**
* Projected columns that are requested to be read. These are not
* all columns, only the ones DuckDB optimizer thinks we should read.
* In DuckDB API it's called "column_ids", renamed to resolve ambiguity.
*/
idx_t *projection_ids;
size_t projection_ids_count;

/**
* Post filter projected columns. Our table function implements filter
* pushdown so this list is a subset of columns referenced in projection_ids
* after filter pushdown and filter pruning. May be empty, in which case
* projection_ids should be used.
* Indices in this list reference values from projection_ids. I.e. if
* projection_ids=[1,5,6], post_filter_projection_ids=[1], output column
* should be projection_ids[1] = 5
*
* In DuckDB API it's called just "projection_ids", renamed to resolve ambiguity.
*
* Example usage:
* https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/function/table_function.hpp#L147
*/
const idx_t *post_filter_projection_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i really think we should keep the naming here and explain the semantics.

It nice to keep the duckdb name

size_t post_filter_projection_ids_count;

duckdb_vx_table_filter_set filters;
duckdb_client_context client_context;
// void *sample_options;
} duckdb_vx_tfunc_init_input;

// Result data returned from the cardinality callback.
typedef struct {
idx_t estimated_cardinality;
bool has_estimated_cardinality;
idx_t max_cardinality;
bool has_estimated_cardinality;
bool has_max_cardinality;
} duckdb_vx_node_statistics;

Expand Down
18 changes: 9 additions & 9 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, Table

duckdb_vx_tfunc_init_input ffi_input = {
.bind_data = bind.ffi_data->DataPtr(),
.column_ids = input.column_ids.data(),
.column_ids_count = input.column_ids.size(),
.projection_ids = input.projection_ids.data(),
.projection_ids_count = input.projection_ids.size(),
.projection_ids = input.column_ids.data(),
.projection_ids_count = input.column_ids.size(),
.post_filter_projection_ids = input.projection_ids.data(),
.post_filter_projection_ids_count = input.projection_ids.size(),
.filters = reinterpret_cast<duckdb_vx_table_filter_set>(input.filters.get()),
.client_context = reinterpret_cast<duckdb_client_context>(&context),
};
Expand All @@ -142,10 +142,10 @@ unique_ptr<LocalTableFunctionState> c_init_local(ExecutionContext &context,

duckdb_vx_tfunc_init_input ffi_input = {
.bind_data = bind.ffi_data->DataPtr(),
.column_ids = input.column_ids.data(),
.column_ids_count = input.column_ids.size(),
.projection_ids = input.projection_ids.data(),
.projection_ids_count = input.projection_ids.size(),
.projection_ids = input.column_ids.data(),
.projection_ids_count = input.column_ids.size(),
.post_filter_projection_ids = input.projection_ids.data(),
.post_filter_projection_ids_count = input.projection_ids.size(),
.filters = reinterpret_cast<duckdb_vx_table_filter_set>(input.filters.get()),
.client_context = reinterpret_cast<duckdb_client_context>(&context),
};
Expand Down Expand Up @@ -214,8 +214,8 @@ unique_ptr<NodeStatistics> c_cardinality(ClientContext &context, const FunctionD

duckdb_vx_node_statistics node_stats_out = {
.estimated_cardinality = 0,
.has_estimated_cardinality = false,
.max_cardinality = 0,
.has_estimated_cardinality = false,
.has_max_cardinality = false,
};
bind.info->vtab.cardinality(bind_data->Cast<CTableBindData>().ffi_data->DataPtr(), &node_stats_out);
Expand Down
107 changes: 66 additions & 41 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use custom_labels::CURRENT_LABELSET;
use futures::StreamExt;
use itertools::Itertools;
use num_traits::AsPrimitive;
use tracing::debug;
use vortex::array::ArrayRef;
use vortex::array::Canonical;
use vortex::array::VortexSessionExecute;
Expand Down Expand Up @@ -62,10 +63,19 @@ use crate::duckdb::VirtualColumnsResultRef;
use crate::exporter::ArrayExporter;
use crate::exporter::ConversionCache;

// taken from duckdb/common/constants.h COLUMN_IDENTIFIER_EMPTY
// This is used by duckdb whenever there is no projection id in a logical_get node.
// For some reason we cannot return an empty DataChunk and duckdb will look for the virtual column
// with this index and create a data chunk with a single vector of that type.
/// Taken from
/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44
///
/// If DuckDB requests a zero-column projection from read_vortex like count(*),
/// its planner tries to get any column:
/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149
///
/// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the
/// first column. As we don't want to fill the output chunk and we can leave
/// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a
/// virtual column in our table function vtab's get_virtual_columns.
/// See vortex-duckdb/cpp/include/duckdb_vx/table_function.h
/// See virtual_columns in this file
static EMPTY_COLUMN_IDX: u64 = 18446744073709551614;
static EMPTY_COLUMN_NAME: &str = "";

Expand Down Expand Up @@ -126,17 +136,19 @@ impl Debug for DataSourceBindData {
}
}

type DataSourceIterator = ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>;

/// Global scan state for driving a `DataSource` scan through DuckDB.
pub struct DataSourceGlobal {
iterator: ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>,
iterator: DataSourceIterator,
batch_id: AtomicU64,
bytes_total: Arc<AtomicU64>,
bytes_read: AtomicU64,
}

/// Per-thread local scan state.
pub struct DataSourceLocal {
iterator: ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>,
iterator: DataSourceIterator,
exporter: Option<ArrayExporter>,
/// The unique batch id of the last chunk exported via scan().
batch_id: Option<u64>,
Expand Down Expand Up @@ -193,27 +205,29 @@ impl<T: DataSourceTableFunction> TableFunction for T {
}

fn init_global(init_input: &TableInitInput<Self>) -> VortexResult<Self::GlobalState> {
debug!("table init input: {init_input:?}");

let bind_data = init_input.bind_data();
let projection_ids = init_input.projection_ids().unwrap_or(&[]);
let column_ids = init_input.column_ids();
let projection_ids = init_input.projection_ids();
let post_filter_projection_ids = init_input.post_filter_projection_ids();

let projection_expr =
extract_projection_expr(projection_ids, column_ids, &bind_data.column_names);
let projection_expr = extract_projection_expr(
post_filter_projection_ids,
projection_ids,
&bind_data.column_names,
);
let filter_expr = extract_table_filter_expr(
init_input.table_filter_set(),
column_ids,
projection_ids,
&bind_data.column_names,
&bind_data.filter_exprs,
bind_data.data_source.dtype(),
)?;

tracing::debug!(
"Global init Vortex scan SELECT {} WHERE {}",
&projection_expr,
filter_expr
.as_ref()
.map_or_else(|| "true".to_string(), |f| f.to_string())
);
let filter_expr_str = filter_expr
.as_ref()
.map_or_else(|| "true".to_string(), |f| f.to_string());
debug!("Global init Vortex scan SELECT {projection_expr} WHERE {filter_expr_str}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this not alloc the str on even if debug ignores it?


let request = ScanRequest {
projection: projection_expr,
Expand Down Expand Up @@ -318,6 +332,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
let (array_result, conversion_cache) = result?;

let array_result = array_result.optimize_recursive()?;

let array_result = if let Some(array) = array_result.as_opt::<StructVTable>() {
array.clone()
} else if let Some(array) = array_result.as_opt::<ScalarFnVTable>()
Expand Down Expand Up @@ -455,35 +470,45 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<(Vec<String>, Vec<Lo

/// Creates a projection expression from raw projection/column ID slices and column names.
fn extract_projection_expr(
post_filter_projection_ids: Option<&[u64]>,
projection_ids: &[u64],
column_ids: &[u64],
column_names: &[String],
projection_names: &[String],
) -> Expression {
select(
projection_ids
.iter()
.map(|p| {
let idx: usize = p.as_();
let val: usize = column_ids[idx].as_();
val
})
.map(|idx| {
column_names
.get(idx)
.vortex_expect("prune idx in column names")
})
.map(|s| Arc::from(s.as_str()))
.collect::<FieldNames>(),
root(),
)
// Post filter projection ids may be empty, in which case
// you need to use projection_ids
// https://github.com/duckdb/duckdb/blob/6e211da91657a94803c465fd0ce585f4c6754b54/src/planner/operator/logical_get.cpp#L168
let (post_filter_projection_ids, has_projection_ids) = match post_filter_projection_ids {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is a projection_id then?

Some(ids) => (ids, true),
None => (projection_ids, false),
};

let names = post_filter_projection_ids
.iter()
.filter(|p| **p != EMPTY_COLUMN_IDX)
.map(|p| {
let mut idx: usize = p.as_();
if has_projection_ids {
idx = projection_ids[idx].as_();
}
idx
})
.map(|idx| {
projection_names
.get(idx)
.vortex_expect("prune idx in column names")
})
.map(|s| Arc::from(s.as_str()))
.collect::<FieldNames>();

select(names, root())
}

/// Creates a table filter expression from the table filter set, column metadata, additional
/// filter expressions, and the top-level DType.
fn extract_table_filter_expr(
table_filter_set: Option<&TableFilterSetRef>,
column_ids: &[u64],
column_names: &[String],
projection_ids: &[u64],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find this naming more confusing, column seems to make more sense here. i think

projection_names: &[String],
additional_filters: &[Expression],
dtype: &DType,
) -> VortexResult<Option<Expression>> {
Expand All @@ -492,8 +517,8 @@ fn extract_table_filter_expr(
.into_iter()
.map(|(idx, ex)| {
let idx_u: usize = idx.as_();
let col_idx: usize = column_ids[idx_u].as_();
let name = column_names.get(col_idx).vortex_expect("exists");
let col_idx: usize = projection_ids[idx_u].as_();
let name = projection_names.get(col_idx).vortex_expect("exists");
try_from_table_filter(ex, &col(name.as_str()), dtype)
})
.collect::<VortexResult<Option<HashSet<_>>>>()?
Expand Down
4 changes: 4 additions & 0 deletions vortex-duckdb/src/duckdb/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl DataChunkRef {
.vortex_expect("Column count exceeds usize")
}

pub fn reset(&mut self) {
unsafe { cpp::duckdb_data_chunk_reset(self.as_ptr()) }
}

/// Set the length of the data chunk.
pub fn set_len(&mut self, len: usize) {
unsafe { cpp::duckdb_data_chunk_set_size(self.as_ptr(), len as _) }
Expand Down
22 changes: 14 additions & 8 deletions vortex-duckdb/src/duckdb/table_function/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ impl<T: TableFunction> Debug for TableInitInput<'_, T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TableInitInput")
.field("table_function", &std::any::type_name::<T>())
.field("column_ids", &self.column_ids())
.field("projection_ids", &self.projection_ids())
.field(
"post_filter_projection_ids",
&self.post_filter_projection_ids(),
)
.field("table_filter_set", &self.table_filter_set())
.finish()
}
Expand All @@ -93,18 +96,21 @@ impl<'a, T: TableFunction> TableInitInput<'a, T> {
unsafe { &*self.input.bind_data.cast::<T::BindData>() }
}

/// Returns the column_ids for the table function.
pub fn column_ids(&self) -> &[u64] {
unsafe { std::slice::from_raw_parts(self.input.column_ids, self.input.column_ids_count) }
pub fn projection_ids(&self) -> &[u64] {
unsafe {
std::slice::from_raw_parts(self.input.projection_ids, self.input.projection_ids_count)
}
}

/// Returns the projection_ids for the table function.
pub fn projection_ids(&self) -> Option<&[u64]> {
if self.input.projection_ids.is_null() {
pub fn post_filter_projection_ids(&self) -> Option<&[u64]> {
if self.input.post_filter_projection_ids.is_null() {
return None;
}
Some(unsafe {
std::slice::from_raw_parts(self.input.projection_ids, self.input.projection_ids_count)
std::slice::from_raw_parts(
self.input.post_filter_projection_ids,
self.input.post_filter_projection_ids_count,
)
})
}

Expand Down
25 changes: 16 additions & 9 deletions vortex-duckdb/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use crate::duckdb::VectorRef;

pub struct ArrayExporter {
ctx: ExecutionCtx,
/// Columns DuckDB requested to read from file. If empty, it's a zero-column
/// projection and should be handled accordingly, see ArrayExporter::export.
fields: Vec<Box<dyn ColumnExporter>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could Model this with an enum? It might be easier to understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about this, but then we need something like NonzeroVec, and I believe it's an overkill.

array_len: usize,
remaining: usize,
Expand All @@ -62,6 +64,7 @@ impl ArrayExporter {
.iter()
.map(|field| new_array_exporter(field.clone(), cache, &mut ctx))
.collect::<VortexResult<Vec<_>>>()?;

Ok(Self {
ctx,
fields,
Expand All @@ -74,26 +77,30 @@ impl ArrayExporter {
///
/// Returns `true` if a chunk was exported, `false` if all rows have been exported.
pub fn export(&mut self, chunk: &mut DataChunkRef) -> VortexResult<bool> {
chunk.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure we need this?

why if we estimate card

if self.remaining == 0 {
return Ok(false);
}

if self.fields.is_empty() {
// In the case of a projection pushdown with zero columns duckdb will ask us for the
// `EMPTY_COLUMN_IDX`, which we define as a bool column, we can leave the vector as
// uninitialized and just return a DataChunk with the correct length.
// One place no fields can occur is in count(*) queries.
chunk.set_len(self.remaining);
self.remaining = 0;

return Ok(true);
let expected_cols = self.fields.len();
let chunk_cols = chunk.column_count();
let zero_projection = expected_cols == 0;
if !zero_projection && chunk_cols != expected_cols {
vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}");
}

let chunk_len = DUCKDB_STANDARD_VECTOR_SIZE.min(self.remaining);
let position = self.array_len - self.remaining;
self.remaining -= chunk_len;
chunk.set_len(chunk_len);

// DuckDB asked us for zero columns. This may happen with aggregation
// functions like count(*). In such case we can leave chunk contents
// uninitialized. See EMPTY_COLUMN_IDX comment why this works.
if zero_projection {
return Ok(true);
}

for (i, field) in self.fields.iter_mut().enumerate() {
field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?;
}
Expand Down
1 change: 1 addition & 0 deletions vortex-duckdb/src/multi_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl DataSourceTableFunction for VortexMultiFileScan {
let glob_url_str = glob_url_parameter.as_string();
let glob_url = match Url::parse(glob_url_str.as_str()) {
Ok(url) => Ok(url),
// TODO(myrrc): doesn't parse relative paths like FROM 'test.vortex'
Err(_) => Url::from_file_path(Path::new(glob_url_str.as_str()))
.map_err(|_| vortex_err!("Neither URL nor path: '{}' ", glob_url_str.as_str())),
}?;
Expand Down
Loading
Loading