Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 164 additions & 222 deletions native/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "58.1.0", default-features = false, features = ["experimental"] }
datafusion = { version = "53.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "53.0.0" }
datafusion-physical-expr-adapter = { version = "53.0.0" }
datafusion-spark = { version = "53.0.0", features = ["core"] }
datafusion = { git = "https://github.com/mbutrovich/datafusion.git", rev = "0198302544", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { git = "https://github.com/mbutrovich/datafusion.git", rev = "0198302544" }
datafusion-physical-expr-adapter = { git = "https://github.com/mbutrovich/datafusion.git", rev = "0198302544" }
datafusion-spark = { git = "https://github.com/mbutrovich/datafusion.git", rev = "0198302544", features = ["core"] }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-common = { path = "common" }
datafusion-comet-jni-bridge = { path = "jni-bridge" }
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jni = { version = "0.22.4", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "53.0.0" }
datafusion-functions-nested = { git = "https://github.com/mbutrovich/datafusion.git", rev = "0198302544" }

[features]
backtrace = ["datafusion/backtrace"]
Expand Down
15 changes: 12 additions & 3 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::DataFusionError;
use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
Expand All @@ -29,7 +30,6 @@ use datafusion::{
};
use futures::{Stream, StreamExt};
use std::{
any::Any,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -91,8 +91,17 @@ impl DisplayAs for ExpandExec {
}

impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
f: &mut dyn FnMut(&dyn PhysicalExpr) -> datafusion::common::Result<TreeNodeRecursion>,
) -> datafusion::common::Result<TreeNodeRecursion> {
let mut tnr = TreeNodeRecursion::Continue;
for projection in &self.projections {
for expr in projection {
tnr = tnr.visit_sibling(|| f(expr.as_ref()))?;
}
}
Ok(tnr)
}

fn schema(&self) -> SchemaRef {
Expand Down
9 changes: 6 additions & 3 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Native Iceberg table scan operator using iceberg-rust

use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
Expand All @@ -26,6 +25,7 @@ use std::task::{Context, Poll};

use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{DataFusionError, Result as DFResult};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::expressions::Column;
Expand Down Expand Up @@ -108,8 +108,11 @@ impl ExecutionPlan for IcebergScanExec {
"IcebergScanExec"
}

fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DFResult<TreeNodeRecursion>,
) -> DFResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn schema(&self) -> SchemaRef {
Expand Down
17 changes: 10 additions & 7 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Parquet writer operator for writing RecordBatches to Parquet files

use std::{
any::Any,
collections::HashMap,
fmt,
fmt::{Debug, Formatter},
Expand All @@ -38,6 +37,7 @@ use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
Expand All @@ -46,8 +46,8 @@ use datafusion::{
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr,
PlanProperties, SendableRecordBatchStream,
},
};
use futures::TryStreamExt;
Expand Down Expand Up @@ -404,14 +404,17 @@ impl DisplayAs for ParquetWriterExec {

#[async_trait]
impl ExecutionPlan for ParquetWriterExec {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ParquetWriterExec"
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand Down
9 changes: 6 additions & 3 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::compute::{cast_with_options, take, CastOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::ffi::FFI_ArrowArray;
use arrow::ffi::FFI_ArrowSchema;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Expand All @@ -43,7 +44,6 @@ use itertools::Itertools;
use jni::objects::{Global, JObject, JValue};
use std::rc::Rc;
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -383,8 +383,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ScanExec {
fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult<TreeNodeRecursion>,
) -> DataFusionResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn schema(&self) -> SchemaRef {
Expand Down
9 changes: 6 additions & 3 deletions native/core/src/execution/operators/shuffle_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{arrow_datafusion_err, Result as DataFusionResult};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Expand All @@ -37,7 +38,6 @@ use datafusion::{
use futures::Stream;
use jni::objects::{Global, JByteBuffer, JObject};
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -221,8 +221,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ShuffleScanExec {
fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult<TreeNodeRecursion>,
) -> DataFusionResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn schema(&self) -> SchemaRef {
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ impl PhysicalPlanner {
hash_join.as_ref().swap_inputs(PartitionMode::Partitioned)?;

let mut additional_native_plans = vec![];
if swapped_hash_join.as_any().is::<ProjectionExec>() {
if swapped_hash_join.is::<ProjectionExec>() {
// a projection was added to the hash join
additional_native_plans.push(Arc::clone(swapped_hash_join.children()[0]));
}
Expand Down
20 changes: 14 additions & 6 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,21 +370,29 @@ impl SparkPhysicalExprAdapter {
.data()
}

/// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression.
/// Replace CastExpr (DataFusion's cast) with Spark's Cast expression.
fn replace_with_spark_cast(
&self,
expr: Arc<dyn PhysicalExpr>,
) -> DataFusionResult<Transformed<Arc<dyn PhysicalExpr>>> {
// Check for CastColumnExpr and replace with spark_expr::Cast
// CastColumnExpr is in datafusion_physical_expr::expressions
// Check for CastExpr and replace with spark_expr::Cast
if let Some(cast) = expr
.as_any()
.downcast_ref::<datafusion::physical_expr::expressions::CastColumnExpr>()
.downcast_ref::<datafusion::physical_expr::expressions::CastExpr>()
{
let child = Arc::clone(cast.expr());
let physical_type = cast.input_field().data_type();
let target_type = cast.target_field().data_type();

// Derive input field from the child Column expression and the physical schema
let input_field = if let Some(col) = child.as_any().downcast_ref::<Column>() {
Arc::new(self.physical_file_schema.field(col.index()).clone())
} else {
// Fallback: synthesize a field from the target field name and child data type
let child_type = cast.expr().data_type(&self.physical_file_schema)?;
Arc::new(Field::new(cast.target_field().name(), child_type, true))
};
let physical_type = input_field.data_type();

// For complex nested types (Struct, List, Map), Timestamp timezone
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
// with spark_parquet_convert which handles field-name-based selection,
Expand Down Expand Up @@ -413,7 +421,7 @@ impl SparkPhysicalExprAdapter {
let comet_cast: Arc<dyn PhysicalExpr> = Arc::new(
CometCastColumnExpr::new(
child,
Arc::clone(cast.input_field()),
input_field,
Arc::clone(cast.target_field()),
None,
)
Expand Down
16 changes: 9 additions & 7 deletions native/shuffle/src/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::partitioners::{
use crate::{CometPartitioning, CompressionCodec};
use async_trait::async_trait;
use datafusion::common::exec_datafusion_err;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::EmptyRecordBatchStream;
use datafusion::{
Expand All @@ -40,7 +41,6 @@ use datafusion::{
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use std::{
any::Any,
fmt,
fmt::{Debug, Formatter},
sync::Arc,
Expand Down Expand Up @@ -119,15 +119,17 @@ impl DisplayAs for ShuffleWriterExec {

#[async_trait]
impl ExecutionPlan for ShuffleWriterExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ShuffleWriterExec"
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand Down
9 changes: 2 additions & 7 deletions native/spark-expr/src/agg_funcs/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::logical_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
};
use datafusion::physical_expr::expressions::format_state_name;
use std::{any::Any, sync::Arc};
use std::sync::Arc;

use arrow::array::ArrowNativeTypeOp;
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
Expand Down Expand Up @@ -67,11 +67,6 @@ impl Avg {
}

impl AggregateUDFImpl for Avg {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
// All numeric types use Float64 accumulation after casting
match (&self.input_data_type, &self.result_data_type) {
Expand Down Expand Up @@ -239,7 +234,7 @@ where
impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
F: Fn(T::Native, i64) -> Result<T::Native> + Send,
F: Fn(T::Native, i64) -> Result<T::Native> + Send + 'static,
{
fn update_batch(
&mut self,
Expand Down
7 changes: 1 addition & 6 deletions native/spark-expr/src/agg_funcs/avg_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::logical_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
};
use datafusion::physical_expr::expressions::format_state_name;
use std::{any::Any, sync::Arc};
use std::sync::Arc;

use crate::utils::{build_bool_state, is_valid_decimal_precision, unlikely};
use crate::{decimal_sum_overflow_error, EvalMode, SparkErrorWithContext};
Expand Down Expand Up @@ -108,11 +108,6 @@ impl AvgDecimal {
}

impl AggregateUDFImpl for AvgDecimal {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
match (&self.sum_data_type, &self.result_data_type) {
(Decimal128(sum_precision, sum_scale), Decimal128(target_precision, target_scale)) => {
Expand Down
7 changes: 1 addition & 6 deletions native/spark-expr/src/agg_funcs/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow::compute::{and, filter, is_not_null};

use std::{any::Any, sync::Arc};
use std::sync::Arc;

use crate::agg_funcs::covariance::CovarianceAccumulator;
use crate::agg_funcs::stddev::StddevAccumulator;
Expand Down Expand Up @@ -58,11 +58,6 @@ impl Correlation {
}

impl AggregateUDFImpl for Correlation {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
6 changes: 0 additions & 6 deletions native/spark-expr/src/agg_funcs/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::logical_expr::type_coercion::aggregates::NUMERICS;
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion::physical_expr::expressions::format_state_name;
use datafusion::physical_expr::expressions::StatsType;
use std::any::Any;
use std::sync::Arc;

/// COVAR_SAMP and COVAR_POP aggregate expression
Expand Down Expand Up @@ -73,11 +72,6 @@ impl Covariance {
}

impl AggregateUDFImpl for Covariance {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
Loading
Loading