Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ jobs:
run: cargo check --profile ci --no-default-features -p datafusion --features=serde
- name: Check datafusion (sql)
run: cargo check --profile ci --no-default-features -p datafusion --features=sql
- name: Check datafusion (spark)
run: cargo check --profile ci --no-default-features -p datafusion --features=spark
- name: Check datafusion (string_expressions)
run: cargo check --profile ci --no-default-features -p datafusion --features=string_expressions
- name: Check datafusion (unicode_expressions)
Expand Down Expand Up @@ -299,7 +301,7 @@ jobs:
--lib \
--tests \
--bins \
--features serde,avro,json,backtrace,integration-tests,parquet_encryption
--features serde,avro,json,backtrace,integration-tests,parquet_encryption,spark
- name: Verify Working Directory Clean
run: git diff --exit-code
# Check no temporary directories created during test.
Expand Down Expand Up @@ -557,7 +559,7 @@ jobs:
uses: ./.github/actions/setup-macos-aarch64-builder
- name: Run tests (excluding doctests)
shell: bash
run: cargo test --profile ci --exclude datafusion-cli --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests
run: cargo test --profile ci --exclude datafusion-cli --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests,spark

vendor:
name: Verify Vendored Code
Expand Down Expand Up @@ -778,4 +780,4 @@ jobs:
run: cargo msrv --output-format json --log-target stdout verify
- name: Check datafusion-proto
working-directory: datafusion/proto
run: cargo msrv --output-format json --log-target stdout verify
run: cargo msrv --output-format json --log-target stdout verify
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ serde = [
# statements in `arrow-schema` crate
"arrow-schema/serde",
]
spark = ["datafusion-spark"]
sql = [
"datafusion-common/sql",
"datafusion-functions-nested?/sql",
Expand Down Expand Up @@ -142,6 +143,7 @@ datafusion-physical-expr-common = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-session = { workspace = true }
datafusion-spark = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true }
Expand Down
58 changes: 58 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::datasource::provider_as_source;
use crate::execution::SessionStateDefaults;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
#[cfg(feature = "spark")]
use crate::spark;
use arrow_schema::{DataType, FieldRef};
use datafusion_catalog::MemoryCatalogProviderList;
use datafusion_catalog::information_schema::{
Expand Down Expand Up @@ -1139,6 +1141,41 @@ impl SessionStateBuilder {
self
}

/// Adds all expr_planners, scalar, aggregate, window and table functions
/// compatible with Apache Spark.
///
/// Note overwrites any previously registered items with the same name.
#[cfg(feature = "spark")]
pub fn with_spark_features(mut self) -> Self {
self.expr_planners
.get_or_insert_with(Vec::new)
// planners are evaluated in order of insertion. Push Apache Spark function planner to the front
// to take precedence over others
.insert(0, Arc::new(spark::planner::SparkFunctionPlanner));

self.scalar_functions
.get_or_insert_with(Vec::new)
.extend(spark::all_default_scalar_functions());

self.aggregate_functions
.get_or_insert_with(Vec::new)
.extend(spark::all_default_aggregate_functions());

self.window_functions
.get_or_insert_with(Vec::new)
.extend(spark::all_default_window_functions());

self.table_functions
.get_or_insert_with(HashMap::new)
.extend(
spark::all_default_table_functions()
.into_iter()
.map(|f| (f.name().to_string(), f)),
);

self
}

/// Returns a new [`SessionStateBuilder`] with default features.
///
/// This is equivalent to calling [`Self::new()`] followed by [`Self::with_default_features()`].
Expand Down Expand Up @@ -2505,4 +2542,25 @@ mod tests {
self.state.window_functions().keys().cloned().collect()
}
}

#[test]
#[cfg(feature = "spark")]
fn test_session_state_with_spark_features() {
let state = SessionStateBuilder::new().with_spark_features().build();

assert!(
state.scalar_functions().contains_key("sha2"),
"Apache Spark scalar function 'sha2' should be registered"
);

assert!(
state.aggregate_functions().contains_key("try_sum"),
"Apache Spark aggregate function 'try_sum' should be registered"
);

assert!(
!state.expr_planners().is_empty(),
"Apache Spark expr planners should be registered"
);
}
}
7 changes: 7 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@
//! * [datafusion_physical_expr]: [`PhysicalExpr`] and related expressions
//! * [datafusion_physical_plan]: [`ExecutionPlan`] and related expressions
//! * [datafusion_physical_optimizer]: [`ExecutionPlan`] and related expressions
//! * [datafusion_spark]: Apache Spark compatible functions
//! * [datafusion_sql]: SQL planner ([`SqlToRel`])
//!
//! [`SchemaProvider`]: datafusion_catalog::SchemaProvider
Expand Down Expand Up @@ -874,6 +875,12 @@ pub mod functions_nested {
pub use datafusion_functions_nested::*;
}

/// re-export of [`datafusion_spark`] crate, if "spark" feature is enabled
pub mod spark {
#[cfg(feature = "spark")]
pub use datafusion_spark::*;
}

/// re-export of [`datafusion_functions_aggregate`] crate
pub mod functions_aggregate {
pub use datafusion_functions_aggregate::*;
Expand Down
28 changes: 28 additions & 0 deletions datafusion/spark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,34 @@
//! ```
//!
//![`Expr`]: datafusion_expr::Expr
//!
//! //! # Example: enabling Apache Spark features with SessionStateBuilder
//!
//! The recommended way to enable Apache Spark compatibility is to use the
//! [`with_spark_features`] method on [`SessionStateBuilder`]. This registers all
//! Apache Spark functions (scalar, aggregate, window, and table) as well as the Apache Spark
//! expression planner.
//!
//! Note: This requires the `spark` feature to be enabled in the `datafusion` crate
//!
//! ```
//! use datafusion::execution::session_state::SessionStateBuilder;
//! use datafusion::prelude::SessionContext;
//!
//! // Create a SessionState with Apache Spark features enabled
//! // note: the order matters here, `with_spark_features` should be
//! // called after `with_default_features` to overwrite any existing functions
//! let state = SessionStateBuilder::new()
//! .with_default_features()
//! .with_spark_features()
//! .build();
//!
//! // Create a SessionContext using this state
//! let ctx = SessionContext::new_with_state(state);
//! ```
//!
//! [`with_spark_features`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html#method.with_spark_features
//! [`SessionStateBuilder`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html

pub mod function;
pub mod planner;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ bigdecimal = { workspace = true }
bytes = { workspace = true, optional = true }
chrono = { workspace = true, optional = true }
clap = { version = "4.5.53", features = ["derive", "env"] }
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-spark = { workspace = true, default-features = true }
datafusion = { workspace = true, default-features = true, features = ["avro", "spark"] }
datafusion-substrait = { workspace = true, default-features = true }
futures = { workspace = true }
half = { workspace = true, default-features = true }
Expand Down
15 changes: 4 additions & 11 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,14 @@ impl TestContext {

let mut state_builder = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime);
.with_runtime_env(runtime)
.with_default_features();

if is_spark_path(relative_path) {
state_builder = state_builder.with_expr_planners(vec![Arc::new(
datafusion_spark::planner::SparkFunctionPlanner,
)]);
state_builder = state_builder.with_spark_features();
}

let mut state = state_builder.with_default_features().build();

if is_spark_path(relative_path) {
info!("Registering Spark functions");
datafusion_spark::register_all(&mut state)
.expect("Can not register Spark functions");
}
let state = state_builder.build();

let mut test_ctx = TestContext::new(SessionContext::new_with_state(state));

Expand Down
Loading