2626use crate :: parquet:: cast_column:: CometCastColumnExpr ;
2727use crate :: parquet:: parquet_support:: { spark_parquet_convert, SparkParquetOptions } ;
2828use arrow:: array:: { ArrayRef , RecordBatch , RecordBatchOptions } ;
29- use arrow:: datatypes:: { DataType , Schema , SchemaRef } ;
29+ use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
3030use datafusion:: common:: tree_node:: { Transformed , TransformedResult , TreeNode } ;
3131use datafusion:: common:: { ColumnStatistics , Result as DataFusionResult } ;
3232use datafusion:: datasource:: schema_adapter:: { SchemaAdapter , SchemaAdapterFactory , SchemaMapper } ;
@@ -71,24 +71,102 @@ impl SparkPhysicalExprAdapterFactory {
7171 }
7272}
7373
74+ /// Remap physical schema field names to match logical schema field names using
75+ /// case-insensitive matching. This allows the DefaultPhysicalExprAdapter (which
76+ /// uses exact name matching) to correctly find columns when the parquet file has
77+ /// different casing than the table schema (e.g., file has "a" but table has "A").
78+ fn remap_physical_schema_names (
79+ logical_schema : & SchemaRef ,
80+ physical_schema : & SchemaRef ,
81+ ) -> SchemaRef {
82+ let logical_names: HashMap < String , & str > = logical_schema
83+ . fields ( )
84+ . iter ( )
85+ . map ( |f| ( f. name ( ) . to_lowercase ( ) , f. name ( ) . as_str ( ) ) )
86+ . collect ( ) ;
87+
88+ let remapped_fields: Vec < _ > = physical_schema
89+ . fields ( )
90+ . iter ( )
91+ . map ( |field| {
92+ if let Some ( logical_name) = logical_names. get ( & field. name ( ) . to_lowercase ( ) ) {
93+ if * logical_name != field. name ( ) {
94+ Arc :: new ( Field :: new (
95+ * logical_name,
96+ field. data_type ( ) . clone ( ) ,
97+ field. is_nullable ( ) ,
98+ ) )
99+ } else {
100+ Arc :: clone ( field)
101+ }
102+ } else {
103+ Arc :: clone ( field)
104+ }
105+ } )
106+ . collect ( ) ;
107+
108+ Arc :: new ( Schema :: new ( remapped_fields) )
109+ }
110+
74111impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
75112 fn create (
76113 & self ,
77114 logical_file_schema : SchemaRef ,
78115 physical_file_schema : SchemaRef ,
79116 ) -> Arc < dyn PhysicalExprAdapter > {
117+ // When case-insensitive, remap physical schema field names to match logical
118+ // field names. The DefaultPhysicalExprAdapter uses exact name matching, so
119+ // without this remapping, columns like "a" won't match logical "A" and will
120+ // be filled with nulls.
121+ //
122+ // We also build a reverse map (logical name -> physical name) so that after
123+ // the default adapter produces expressions, we can remap column names back
124+ // to the original physical names. This is necessary because downstream code
125+ // (reassign_expr_columns) looks up columns by name in the actual stream
126+ // schema, which uses the original physical file column names.
127+ let ( adapted_physical_schema, logical_to_physical_names) =
128+ if !self . parquet_options . case_sensitive {
129+ let logical_to_physical: HashMap < String , String > = logical_file_schema
130+ . fields ( )
131+ . iter ( )
132+ . filter_map ( |logical_field| {
133+ physical_file_schema
134+ . fields ( )
135+ . iter ( )
136+ . find ( |pf| {
137+ pf. name ( ) . to_lowercase ( ) == logical_field. name ( ) . to_lowercase ( )
138+ && pf. name ( ) != logical_field. name ( )
139+ } )
140+ . map ( |pf| ( logical_field. name ( ) . clone ( ) , pf. name ( ) . clone ( ) ) )
141+ } )
142+ . collect ( ) ;
143+ let remapped =
144+ remap_physical_schema_names ( & logical_file_schema, & physical_file_schema) ;
145+ (
146+ remapped,
147+ if logical_to_physical. is_empty ( ) {
148+ None
149+ } else {
150+ Some ( logical_to_physical)
151+ } ,
152+ )
153+ } else {
154+ ( Arc :: clone ( & physical_file_schema) , None )
155+ } ;
156+
80157 let default_factory = DefaultPhysicalExprAdapterFactory ;
81158 let default_adapter = default_factory. create (
82159 Arc :: clone ( & logical_file_schema) ,
83- Arc :: clone ( & physical_file_schema ) ,
160+ Arc :: clone ( & adapted_physical_schema ) ,
84161 ) ;
85162
86163 Arc :: new ( SparkPhysicalExprAdapter {
87164 logical_file_schema,
88- physical_file_schema,
165+ physical_file_schema : adapted_physical_schema ,
89166 parquet_options : self . parquet_options . clone ( ) ,
90167 default_values : self . default_values . clone ( ) ,
91168 default_adapter,
169+ logical_to_physical_names,
92170 } )
93171 }
94172}
@@ -112,6 +190,13 @@ struct SparkPhysicalExprAdapter {
112190 default_values : Option < HashMap < usize , ScalarValue > > ,
113191 /// The default DataFusion adapter to delegate standard handling to
114192 default_adapter : Arc < dyn PhysicalExprAdapter > ,
193+ /// Mapping from logical column names to original physical column names,
194+ /// used for case-insensitive mode where names differ in casing.
195+ /// After the default adapter rewrites expressions using the remapped
196+ /// physical schema (with logical names), we need to restore the original
197+ /// physical names so that downstream reassign_expr_columns can find
198+ /// columns in the actual stream schema.
199+ logical_to_physical_names : Option < HashMap < String , String > > ,
115200}
116201
117202impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
@@ -137,6 +222,29 @@ impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
137222 self . wrap_all_type_mismatches ( expr) ?
138223 }
139224 } ;
225+
226+ // For case-insensitive mode: remap column names from logical back to
227+ // original physical names. The default adapter was given a remapped
228+ // physical schema (with logical names) so it could find columns. But
229+ // downstream code (reassign_expr_columns) looks up columns by name in
230+ // the actual parquet stream schema, which uses the original physical names.
231+ let expr = if let Some ( name_map) = & self . logical_to_physical_names {
232+ expr. transform ( |e| {
233+ if let Some ( col) = e. as_any ( ) . downcast_ref :: < Column > ( ) {
234+ if let Some ( physical_name) = name_map. get ( col. name ( ) ) {
235+ return Ok ( Transformed :: yes ( Arc :: new ( Column :: new (
236+ physical_name,
237+ col. index ( ) ,
238+ ) ) ) ) ;
239+ }
240+ }
241+ Ok ( Transformed :: no ( e) )
242+ } )
243+ . data ( ) ?
244+ } else {
245+ expr
246+ } ;
247+
140248 Ok ( expr)
141249 }
142250}
0 commit comments