-
Notifications
You must be signed in to change notification settings - Fork 313
Expand file tree
/
Copy pathschema_adapter.rs
More file actions
685 lines (621 loc) · 29.9 KB
/
schema_adapter.rs
File metadata and controls
685 lines (621 loc) · 29.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::parquet::cast_column::CometCastColumnExpr;
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use datafusion::scalar::ScalarValue;
use datafusion_comet_common::SparkError;
use datafusion_comet_spark_expr::{Cast, SparkCastOptions};
use datafusion_physical_expr_adapter::{
replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter,
PhysicalExprAdapterFactory,
};
use std::collections::HashMap;
use std::sync::Arc;
/// Factory for creating Spark-compatible physical expression adapters.
///
/// This factory creates adapters that rewrite expressions at planning time
/// to inject Spark-compatible casts where needed.
#[derive(Clone, Debug)]
pub struct SparkPhysicalExprAdapterFactory {
/// Spark-specific parquet options for type conversions
parquet_options: SparkParquetOptions,
/// Default values for columns that may be missing from the physical schema.
/// The key is the Column (containing name and index).
default_values: Option<HashMap<Column, ScalarValue>>,
}
impl SparkPhysicalExprAdapterFactory {
/// Create a new factory with the given options.
pub fn new(
parquet_options: SparkParquetOptions,
default_values: Option<HashMap<Column, ScalarValue>>,
) -> Self {
Self {
parquet_options,
default_values,
}
}
}
/// Remap physical schema field names to match logical schema field names using
/// case-insensitive matching. This allows the DefaultPhysicalExprAdapter (which
/// uses exact name matching) to correctly find columns when the parquet file has
/// different casing than the table schema (e.g., file has "a" but table has "A").
fn remap_physical_schema_names(
logical_schema: &SchemaRef,
physical_schema: &SchemaRef,
) -> SchemaRef {
let remapped_fields: Vec<_> = physical_schema
.fields()
.iter()
.map(|field| {
if let Some(logical_field) = logical_schema
.fields()
.iter()
.find(|lf| lf.name().eq_ignore_ascii_case(field.name()))
{
if logical_field.name() != field.name() {
Arc::new(Field::new(
logical_field.name(),
field.data_type().clone(),
field.is_nullable(),
))
} else {
Arc::clone(field)
}
} else {
Arc::clone(field)
}
})
.collect();
Arc::new(Schema::new(remapped_fields))
}
/// Check if a specific column name has duplicate matches in the physical schema
/// (case-insensitive). Returns the error info if so.
fn check_column_duplicate(col_name: &str, physical_schema: &SchemaRef) -> Option<(String, String)> {
let matches: Vec<&str> = physical_schema
.fields()
.iter()
.filter(|pf| pf.name().eq_ignore_ascii_case(col_name))
.map(|pf| pf.name().as_str())
.collect();
if matches.len() > 1 {
// Include brackets to match the format expected by ShimSparkErrorConverter
Some((col_name.to_string(), format!("[{}]", matches.join(", "))))
} else {
None
}
}
impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> DataFusionResult<Arc<dyn PhysicalExprAdapter>> {
// When case-insensitive, remap physical schema field names to match logical
// field names. The DefaultPhysicalExprAdapter uses exact name matching, so
// without this remapping, columns like "a" won't match logical "A" and will
// be filled with nulls.
//
// We also build a reverse map (logical name -> physical name) so that after
// the default adapter produces expressions, we can remap column names back
// to the original physical names. This is necessary because downstream code
// (reassign_expr_columns) looks up columns by name in the actual stream
// schema, which uses the original physical file column names.
let (adapted_physical_schema, logical_to_physical_names, original_physical_schema) =
if !self.parquet_options.case_sensitive {
let logical_to_physical: HashMap<String, String> = logical_file_schema
.fields()
.iter()
.filter_map(|logical_field| {
physical_file_schema
.fields()
.iter()
.find(|pf| {
pf.name().eq_ignore_ascii_case(logical_field.name())
&& pf.name() != logical_field.name()
})
.map(|pf| (logical_field.name().clone(), pf.name().clone()))
})
.collect();
let remapped =
remap_physical_schema_names(&logical_file_schema, &physical_file_schema);
(
remapped,
if logical_to_physical.is_empty() {
None
} else {
Some(logical_to_physical)
},
// Keep original physical schema for per-column duplicate detection
Some(Arc::clone(&physical_file_schema)),
)
} else {
(Arc::clone(&physical_file_schema), None, None)
};
let default_factory = DefaultPhysicalExprAdapterFactory;
let default_adapter = default_factory.create(
Arc::clone(&logical_file_schema),
Arc::clone(&adapted_physical_schema),
)?;
Ok(Arc::new(SparkPhysicalExprAdapter {
logical_file_schema,
physical_file_schema: adapted_physical_schema,
parquet_options: self.parquet_options.clone(),
default_values: self.default_values.clone(),
default_adapter,
logical_to_physical_names,
original_physical_schema,
}))
}
}
/// Spark-compatible physical expression adapter.
///
/// This adapter rewrites expressions at planning time to:
/// 1. Replace references to missing columns with default values or nulls
/// 2. Replace standard DataFusion cast expressions with Spark-compatible casts
/// 3. Handle case-insensitive column matching
#[derive(Debug)]
struct SparkPhysicalExprAdapter {
/// The logical schema expected by the query
logical_file_schema: SchemaRef,
/// The physical schema of the actual file being read
physical_file_schema: SchemaRef,
/// Spark-specific options for type conversions
parquet_options: SparkParquetOptions,
/// Default values for missing columns (keyed by Column)
default_values: Option<HashMap<Column, ScalarValue>>,
/// The default DataFusion adapter to delegate standard handling to
default_adapter: Arc<dyn PhysicalExprAdapter>,
/// Mapping from logical column names to original physical column names,
/// used for case-insensitive mode where names differ in casing.
/// After the default adapter rewrites expressions using the remapped
/// physical schema (with logical names), we need to restore the original
/// physical names so that downstream reassign_expr_columns can find
/// columns in the actual stream schema.
logical_to_physical_names: Option<HashMap<String, String>>,
/// The original (un-remapped) physical schema, kept for per-column duplicate
/// detection in case-insensitive mode. Only set when `!case_sensitive`.
original_physical_schema: Option<SchemaRef>,
}
impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
// In case-insensitive mode, check if any Column in this expression references
// a field with multiple case-insensitive matches in the physical schema.
// Only the columns actually referenced trigger the error (not the whole schema).
if let Some(orig_physical) = &self.original_physical_schema {
// Walk the expression tree to find Column references
let mut duplicate_err: Option<DataFusionError> = None;
let _ = Arc::<dyn PhysicalExpr>::clone(&expr).transform(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
if let Some((req, matched)) = check_column_duplicate(col.name(), orig_physical)
{
duplicate_err = Some(DataFusionError::External(Box::new(
SparkError::DuplicateFieldCaseInsensitive {
required_field_name: req,
matched_fields: matched,
},
)));
}
}
Ok(Transformed::no(e))
});
if let Some(err) = duplicate_err {
return Err(err);
}
}
// First let the default adapter handle column remapping, missing columns,
// and simple scalar type casts. Then replace DataFusion's CastColumnExpr
// with Spark-compatible equivalents.
//
// The default adapter may fail for complex nested type casts (List, Map).
// In that case, fall back to wrapping everything ourselves.
let expr = self.replace_missing_with_defaults(expr)?;
let expr = match self.default_adapter.rewrite(Arc::clone(&expr)) {
Ok(rewritten) => {
// Replace references to missing columns with default values
// Replace DataFusion's CastColumnExpr with either:
// - CometCastColumnExpr (for Struct/List/Map, uses spark_parquet_convert)
// - Spark Cast (for simple scalar types)
rewritten
.transform(|e| self.replace_with_spark_cast(e))
.data()?
}
Err(e) => {
// Default adapter failed (likely complex nested type cast).
// Handle all type mismatches ourselves using spark_parquet_convert.
log::debug!("Default schema adapter error: {}", e);
self.wrap_all_type_mismatches(expr)?
}
};
// For case-insensitive mode: remap column names from logical back to
// original physical names. The default adapter was given a remapped
// physical schema (with logical names) so it could find columns. But
// downstream code (reassign_expr_columns) looks up columns by name in
// the actual parquet stream schema, which uses the original physical names.
let expr = if let Some(name_map) = &self.logical_to_physical_names {
expr.transform(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
if let Some(physical_name) = name_map.get(col.name()) {
return Ok(Transformed::yes(Arc::new(Column::new(
physical_name,
col.index(),
))));
}
}
Ok(Transformed::no(e))
})
.data()?
} else {
expr
};
Ok(expr)
}
}
impl SparkPhysicalExprAdapter {
/// Wrap ALL Column expressions that have type mismatches with CometCastColumnExpr.
/// This is the fallback path when the default adapter fails (e.g., for complex
/// nested type casts like List<Struct> or Map). Uses `spark_parquet_convert`
/// under the hood for the actual type conversion.
fn wrap_all_type_mismatches(
&self,
expr: Arc<dyn PhysicalExpr>,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
expr.transform(|e| {
if let Some(column) = e.as_any().downcast_ref::<Column>() {
let col_name = column.name();
// Resolve fields by name because this is the fallback path
// that runs on the original expression when the default
// adapter fails. The original expression was built against
// the required (pruned) schema, so column indices refer to
// that schema — not the logical or physical file schemas.
// DataFusion's DefaultPhysicalExprAdapter::resolve_physical_column
// also resolves by name for the same reason.
let logical_field = if self.parquet_options.case_sensitive {
self.logical_file_schema
.fields()
.iter()
.find(|f| f.name() == col_name)
} else {
self.logical_file_schema
.fields()
.iter()
.find(|f| f.name().eq_ignore_ascii_case(col_name))
};
let physical_field = if self.parquet_options.case_sensitive {
self.physical_file_schema
.fields()
.iter()
.find(|f| f.name() == col_name)
} else {
self.physical_file_schema
.fields()
.iter()
.find(|f| f.name().eq_ignore_ascii_case(col_name))
};
// Remap the column index to the physical file schema so
// downstream evaluation reads the correct column from the
// parquet batch.
let physical_index = if self.parquet_options.case_sensitive {
self.physical_file_schema.index_of(col_name).ok()
} else {
self.physical_file_schema
.fields()
.iter()
.position(|f| f.name().eq_ignore_ascii_case(col_name))
};
if let (Some(logical_field), Some(physical_field), Some(phys_idx)) =
(logical_field, physical_field, physical_index)
{
let remapped: Arc<dyn PhysicalExpr> = if column.index() != phys_idx {
Arc::new(Column::new(col_name, phys_idx))
} else {
Arc::clone(&e)
};
if logical_field.data_type() != physical_field.data_type() {
let cast_expr: Arc<dyn PhysicalExpr> = Arc::new(
CometCastColumnExpr::new(
remapped,
Arc::clone(physical_field),
Arc::clone(logical_field),
None,
)
.with_parquet_options(self.parquet_options.clone()),
);
return Ok(Transformed::yes(cast_expr));
} else if column.index() != phys_idx {
return Ok(Transformed::yes(remapped));
}
}
}
Ok(Transformed::no(e))
})
.data()
}
/// Replace CastExpr (DataFusion's cast) with Spark's Cast expression.
fn replace_with_spark_cast(
&self,
expr: Arc<dyn PhysicalExpr>,
) -> DataFusionResult<Transformed<Arc<dyn PhysicalExpr>>> {
// Check for CastExpr and replace with spark_expr::Cast
if let Some(cast) = expr
.as_any()
.downcast_ref::<datafusion::physical_expr::expressions::CastExpr>()
{
let child = Arc::clone(cast.expr());
let target_type = cast.target_field().data_type();
// Derive input field from the child Column expression and the physical schema
let input_field = if let Some(col) = child.as_any().downcast_ref::<Column>() {
Arc::new(self.physical_file_schema.field(col.index()).clone())
} else {
// Fallback: synthesize a field from the target field name and child data type
let child_type = cast.expr().data_type(&self.physical_file_schema)?;
Arc::new(Field::new(cast.target_field().name(), child_type, true))
};
let physical_type = input_field.data_type();
// For complex nested types (Struct, List, Map), Timestamp timezone
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
// with spark_parquet_convert which handles field-name-based selection,
// reordering, nested type casting, metadata-only timestamp timezone
// relabeling, and raw value reinterpretation correctly.
//
// Timestamp mismatches (e.g., Timestamp(us, None) -> Timestamp(us, Some("UTC")))
// occur when INT96 Parquet timestamps are coerced to Timestamp(us, None) by
// DataFusion but the logical schema expects Timestamp(us, Some("UTC")).
// Using Spark's Cast here would incorrectly treat the None-timezone values as
// local time (TimestampNTZ) and apply a timezone conversion, but the values are
// already in UTC. spark_parquet_convert handles this as a metadata-only change.
//
// Timestamp→Int64 occurs when Spark's `nanosAsLong` config converts
// TIMESTAMP(NANOS) to LongType. Spark's Cast would divide by MICROS_PER_SECOND
// (assuming microseconds), but the values are nanoseconds. Arrow cast correctly
// reinterprets the raw i64 value without conversion.
if matches!(
(physical_type, target_type),
(DataType::Struct(_), DataType::Struct(_))
| (DataType::List(_), DataType::List(_))
| (DataType::Map(_, _), DataType::Map(_, _))
| (DataType::Timestamp(_, _), DataType::Timestamp(_, _))
| (DataType::Timestamp(_, _), DataType::Int64)
) {
let comet_cast: Arc<dyn PhysicalExpr> = Arc::new(
CometCastColumnExpr::new(
child,
input_field,
Arc::clone(cast.target_field()),
None,
)
.with_parquet_options(self.parquet_options.clone()),
);
return Ok(Transformed::yes(comet_cast));
}
// For simple scalar type casts, use Spark-compatible Cast expression
let mut cast_options = SparkCastOptions::new(
self.parquet_options.eval_mode,
&self.parquet_options.timezone,
self.parquet_options.allow_incompat,
);
cast_options.allow_cast_unsigned_ints = self.parquet_options.allow_cast_unsigned_ints;
cast_options.is_adapting_schema = true;
let spark_cast = Arc::new(Cast::new(
child,
target_type.clone(),
cast_options,
None,
None,
));
return Ok(Transformed::yes(spark_cast as Arc<dyn PhysicalExpr>));
}
Ok(Transformed::no(expr))
}
/// Replace references to missing columns with default values.
fn replace_missing_with_defaults(
&self,
expr: Arc<dyn PhysicalExpr>,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
let Some(defaults) = &self.default_values else {
return Ok(expr);
};
if defaults.is_empty() {
return Ok(expr);
}
// Build owned (column_name, default_value) pairs for columns missing from the physical file.
// For each default: filter to only columns absent from physical schema, then type-cast
// the value to match the logical schema's field type if they differ (using Spark cast semantics).
let missing_column_defaults: Vec<(String, ScalarValue)> = defaults
.iter()
.filter_map(|(col, val)| {
let col_name = col.name();
// Only include defaults for columns missing from the physical file schema
let is_missing = if self.parquet_options.case_sensitive {
self.physical_file_schema.field_with_name(col_name).is_err()
} else {
!self
.physical_file_schema
.fields()
.iter()
.any(|f| f.name().eq_ignore_ascii_case(col_name))
};
if !is_missing {
return None;
}
// Cast value to logical schema type if needed (only if types differ)
let value = self
.logical_file_schema
.field_with_name(col_name)
.ok()
.filter(|field| val.data_type() != *field.data_type())
.and_then(|field| {
spark_parquet_convert(
ColumnarValue::Scalar(val.clone()),
field.data_type(),
&self.parquet_options,
)
.ok()
.and_then(|cv| match cv {
ColumnarValue::Scalar(s) => Some(s),
_ => None,
})
})
.unwrap_or_else(|| val.clone());
Some((col_name.to_string(), value))
})
.collect();
let name_based: HashMap<&str, &ScalarValue> = missing_column_defaults
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect();
if name_based.is_empty() {
return Ok(expr);
}
replace_columns_with_literals(expr, &name_based)
}
}
#[cfg(test)]
mod test {
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
use arrow::array::UInt32Array;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::common::DataFusionError;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_comet_spark_expr::test_common::file_util::get_temp_filename;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use futures::StreamExt;
use parquet::arrow::ArrowWriter;
use std::fs::File;
use std::sync::Arc;
#[tokio::test]
async fn parquet_roundtrip_int_as_string() -> Result<(), DataFusionError> {
let file_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>;
let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]))
as Arc<dyn arrow::array::Array>;
let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names])?;
let required_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
]));
let _ = roundtrip(&batch, required_schema).await?;
Ok(())
}
#[tokio::test]
async fn parquet_roundtrip_unsigned_int() -> Result<(), DataFusionError> {
let file_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::UInt32, false)]));
let ids = Arc::new(UInt32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>;
let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids])?;
let required_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let _ = roundtrip(&batch, required_schema).await?;
Ok(())
}
/// Create a Parquet file containing a single batch and then read the batch back using
/// the specified required_schema. This will cause the PhysicalExprAdapter code to be used.
async fn roundtrip(
batch: &RecordBatch,
required_schema: SchemaRef,
) -> Result<RecordBatch, DataFusionError> {
let filename = get_temp_filename();
let filename = filename.as_path().as_os_str().to_str().unwrap().to_string();
let file = File::create(&filename)?;
let mut writer = ArrowWriter::try_new(file, Arc::clone(&batch.schema()), None)?;
writer.write(batch)?;
writer.close()?;
let object_store_url = ObjectStoreUrl::local_filesystem();
let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
spark_parquet_options.allow_cast_unsigned_ints = true;
// Create expression adapter factory for Spark-compatible schema adaptation
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, None),
);
let parquet_source = ParquetSource::new(required_schema);
let files = FileGroup::new(vec![PartitionedFile::from_path(filename.to_string())?]);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, Arc::new(parquet_source))
.with_file_groups(vec![files])
.with_expr_adapter(Some(expr_adapter_factory))
.build();
let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?;
stream.next().await.unwrap()
}
#[tokio::test]
async fn parquet_duplicate_fields_case_insensitive() {
// Parquet file has columns "A", "B", "b" - reading "b" in case-insensitive mode
// should fail with duplicate field error matching Spark's _LEGACY_ERROR_TEMP_2093
let file_schema = Arc::new(Schema::new(vec![
Field::new("A", DataType::Int32, false),
Field::new("B", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let col_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>;
let col_b1 = Arc::new(Int32Array::from(vec![4, 5, 6])) as Arc<dyn arrow::array::Array>;
let col_b2 = Arc::new(Int32Array::from(vec![7, 8, 9])) as Arc<dyn arrow::array::Array>;
let batch =
RecordBatch::try_new(Arc::clone(&file_schema), vec![col_a, col_b1, col_b2]).unwrap();
let filename = get_temp_filename();
let filename = filename.as_path().as_os_str().to_str().unwrap().to_string();
let file = File::create(&filename).unwrap();
let mut writer = ArrowWriter::try_new(file, Arc::clone(&batch.schema()), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
// Read with case-insensitive mode, requesting column "b" which matches both "B" and "b"
let required_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
spark_parquet_options.case_sensitive = false;
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, None),
);
let object_store_url = ObjectStoreUrl::local_filesystem();
let parquet_source = ParquetSource::new(required_schema);
let files = FileGroup::new(vec![
PartitionedFile::from_path(filename.to_string()).unwrap()
]);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, Arc::new(parquet_source))
.with_file_groups(vec![files])
.with_expr_adapter(Some(expr_adapter_factory))
.build();
let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
let mut stream = parquet_exec
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let result = stream.next().await.unwrap();
// Should fail with duplicate field error
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Found duplicate field"),
"Expected duplicate field error, got: {err_msg}"
);
}
}