Skip to content

Commit 49a9f99

Browse files
authored
Avro: Reading files using DataFileStream with ROW LINEAGE if the column isn't projected (#15508)
1 parent 8b7f6d7 commit 49a9f99

2 files changed

Lines changed: 69 additions & 2 deletions

File tree

core/src/main/java/org/apache/iceberg/avro/ValueReaders.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,10 @@ private static Pair<Integer, ValueReader<?>> fileFieldReader(
254254
Integer projectionPos,
255255
ValueReader<?> fieldReader,
256256
Map<Integer, ?> idToConstant) {
257-
if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
257+
if (projectionPos == null) {
258+
// field is in the file but not projected; keep the reader only for skipping
259+
return Pair.of(null, fieldReader);
260+
} else if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
258261
Long firstRowId = (Long) idToConstant.get(fieldId);
259262
return Pair.of(projectionPos, ValueReaders.rowIds(firstRowId, fieldReader));
260263
} else if (Objects.equals(fieldId, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId())) {
@@ -273,7 +276,7 @@ private static Pair<Integer, ValueReader<?>> fieldReader(
273276
ValueReader<?> fieldReader,
274277
Map<Integer, ?> idToConstant) {
275278
Object constant = idToConstant.get(fieldId);
276-
if (projectionPos != null && constant != null) {
279+
if (constant != null) {
277280
return Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant));
278281
}
279282

core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.ByteArrayInputStream;
2525
import java.io.ByteArrayOutputStream;
26+
import java.io.File;
2627
import java.io.IOException;
2728
import java.time.LocalDateTime;
2829
import java.time.OffsetDateTime;
@@ -33,14 +34,18 @@
3334
import org.apache.avro.LogicalTypes;
3435
import org.apache.avro.Schema;
3536
import org.apache.avro.SchemaBuilder;
37+
import org.apache.avro.file.DataFileWriter;
3638
import org.apache.avro.generic.GenericData;
3739
import org.apache.avro.generic.GenericDatumWriter;
3840
import org.apache.avro.generic.GenericRecord;
3941
import org.apache.avro.io.BinaryDecoder;
4042
import org.apache.avro.io.BinaryEncoder;
4143
import org.apache.avro.io.DecoderFactory;
4244
import org.apache.avro.io.EncoderFactory;
45+
import org.apache.iceberg.Files;
4346
import org.apache.iceberg.MetadataColumns;
47+
import org.apache.iceberg.avro.Avro;
48+
import org.apache.iceberg.avro.AvroIterable;
4449
import org.apache.iceberg.data.Record;
4550
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4651
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -305,6 +310,65 @@ public void testMixedRowLineageValues() throws IOException {
305310
.isEqualTo(10L);
306311
}
307312

313+
@Test
314+
public void testLineageColumnsNotProjected() throws Exception {
315+
316+
org.apache.iceberg.Schema icebergSchema =
317+
new org.apache.iceberg.Schema(
318+
Types.NestedField.required(1, "data", Types.StringType.get()));
319+
320+
Schema fileSchema =
321+
SchemaBuilder.record("test")
322+
.fields()
323+
.name("data")
324+
.type()
325+
.stringType()
326+
.noDefault()
327+
.name(MetadataColumns.ROW_ID.name())
328+
.type()
329+
.optional()
330+
.longType()
331+
.name(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
332+
.type()
333+
.optional()
334+
.longType()
335+
.endRecord();
336+
337+
fileSchema.getField("data").addProp("field-id", 1);
338+
fileSchema
339+
.getField(MetadataColumns.ROW_ID.name())
340+
.addProp("field-id", MetadataColumns.ROW_ID.fieldId());
341+
fileSchema
342+
.getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
343+
.addProp("field-id", MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
344+
345+
File file = File.createTempFile("test", ".avro");
346+
347+
try (DataFileWriter<GenericRecord> writer =
348+
new DataFileWriter<>(new GenericDatumWriter<>(fileSchema))) {
349+
350+
writer.create(fileSchema, file);
351+
352+
GenericRecord rec = new GenericData.Record(fileSchema);
353+
rec.put("data", "a");
354+
rec.put(MetadataColumns.ROW_ID.name(), 10L);
355+
rec.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 5L);
356+
357+
writer.append(rec);
358+
}
359+
360+
try (AvroIterable<Record> reader =
361+
Avro.read(Files.localInput(file))
362+
.createResolvingReader(schema -> PlannedDataReader.create(icebergSchema))
363+
.project(icebergSchema)
364+
.build()) {
365+
366+
List<Record> rows = Lists.newArrayList(reader);
367+
assertThat(rows).hasSize(1);
368+
assertThat(rows.get(0).getField("data")).isEqualTo("a");
369+
}
370+
}
371+
308372
private Record readRecord(
309373
PlannedDataReader<Record> reader, Schema avroSchema, GenericRecord avroRecord)
310374
throws IOException {

0 commit comments

Comments
 (0)