Conversation
d290657 to
25db829
Compare
There was a problem hiding this comment.
Pull request overview
This PR attempts to introduce function caching for external functions using Rust's OnceLock to avoid repeatedly wrapping function pointers in Arc. A cache! macro is introduced to replace direct Arc::new() calls for all Spark extension functions (except the "Placeholder" function).
Changes:
- Added
OnceLockimport and created acache!macro to cache function implementations - Replaced 30+ direct
Arc::new(function)calls withcache!(function)macro invocations
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| macro_rules! cache { | ||
| ($func:path) => {{ | ||
| static CELL: OnceLock<ScalarFunctionImplementation> = OnceLock::new(); | ||
| CELL.get_or_init(|| Arc::new($func)).clone() | ||
| }}; | ||
| } |
There was a problem hiding this comment.
The cache macro has a critical bug: all invocations share the same static CELL variable. This means that once any function is cached in the first call, all subsequent function lookups will return that same cached function, regardless of which function was requested.
For example, if "Spark_NullIf" is called first, it will cache spark_null_if::spark_null_if. Then when "Spark_NullIfZero" is called, it will return the same spark_null_if::spark_null_if function instead of spark_null_if::spark_null_if_zero.
To fix this, each invocation needs its own unique static variable. This can be achieved by making the static variable name unique per function path, or by using a different caching approach such as a global HashMap with function names as keys.
There was a problem hiding this comment.
In rust, statics inside functions or blocks are not global singletons sharing the same name; they are local singletons unique to that specific scope instantiation. In this case, it is to that specific matching arm.
Just running this test in AuronQuerySuite
test("my cache test") {
withTable("my_cache_table") {
sql("""
|create table my_cache_table using parquet as
|select col1, col2 from values ('a,A', '{"a":"1", "b":"2"}'), ('b,B', '{"a":"3", "b":"4"}'), ('c,C', '{"a":"5", "b":"6"}')
|""".stripMargin)
sql("""
|select split(col1, ',')[0],
| split(col1, ',')[1],
| get_json_object(col2, '$.a'),
| get_json_object(col2, '$.b')
|from my_cache_table
|""".stripMargin).show()
}
}
we can see the following correct answer.
+---------------------+---------------------+--------------------------+--------------------------+
|split(col1, ,, -1)[0]|split(col1, ,, -1)[1]|get_json_object(col2, $.a)|get_json_object(col2, $.b)|
+---------------------+---------------------+--------------------------+--------------------------+
| a| A| 1| 2|
| b| B| 3| 4|
| c| C| 5| 6|
+---------------------+---------------------+--------------------------+--------------------------+
It can handle different ext function StringSplit, GetParsedJsonObject and ParseJson.
ProjectExec [
(spark_ext_function_Spark_StringSplit(#2@0, ,)).[1] AS #16,
(spark_ext_function_Spark_StringSplit(#2@0, ,)).[2] AS #17, spark_ext_function_Spark_GetParsedJsonObject(spark_ext_function_Spark_ParseJson(#3@1), $.a) AS #18,
spark_ext_function_Spark_GetParsedJsonObject(spark_ext_function_Spark_ParseJson(#3@1), $.b) AS #19
], schema=[#16:Utf8;N, #17:Utf8;N, #18:Utf8;N, #19:Utf8;N]
There was a problem hiding this comment.
i think a wrapper for ScalarFunctionImplementation with overriden PartialEq/Eq implementation is better
453831c to
ec78595
Compare
ec78595 to
ddf31fd
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let scalar_udf = if scalar_function == protobuf::ScalarFunction::AuronExtFunctions { | ||
| let fun_name = &e.name; | ||
| let fun = datafusion_ext_functions::create_auron_ext_function( | ||
| fun_name, | ||
| self.partition_id, | ||
| )?; | ||
| Arc::new(create_udf( | ||
| &format!("spark_ext_function_{fun_name}"), | ||
| args.iter() | ||
| .map(|e| e.data_type(input_schema)) | ||
| .collect::<Result<Vec<_>, _>>()?, | ||
| convert_required!(e.return_type)?, | ||
| Volatility::Volatile, | ||
| fun, | ||
| )) | ||
| )?; | ||
| Arc::new(ScalarUDF::from(fun)) | ||
| } else { |
There was a problem hiding this comment.
create_auron_ext_function returns a new AuronExtFunction instance per invocation, and the planner wraps it in a new ScalarUDF each time. If expression deduplication depends on pointer equality of the UDF/impl (directly or indirectly), this will still prevent caching of repeated ext function expressions. Consider reusing a shared Arc<ScalarUDF> (or shared impl) per function signature/name rather than constructing a new one for every parsed expression.
| @@ -86,5 +93,55 @@ pub fn create_auron_ext_function( | |||
| } | |||
| "Spark_IsNaN" => Arc::new(spark_isnan::spark_isnan), | |||
| _ => df_unimplemented_err!("spark ext function not implemented: {name}")?, | |||
| }; | |||
There was a problem hiding this comment.
This still constructs a fresh Arc for each ScalarFunctionImplementation via Arc::new(...) on every call. If DataFusion equality/dedup relies on Arc::ptr_eq for the function implementation (as described in #1902), logically identical ext functions will still compare unequal and won’t be deduplicated. Consider caching/reusing a single Arc per function (e.g., static OnceCell<ScalarFunctionImplementation> per match arm or a global map keyed by name) and cloning that Arc here instead of allocating a new one each time.
Which issue does this PR close?
Closes #1902
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?