Skip to content

Commit 32b2f00

Browse files
authored
Spark: Replicate position delete array/map fix to Spark 3.4, 3.5, and 4.0 (#15743)
Follow-up to #15632, which updated Spark 4.1 only. Port the same PositionDeletesRowReader residual extraction (retain all non-constant field IDs for extractByIdInclusive, not only primitives) and the matching rewrite_position_delete_files and position_deletes metadata tests to the Spark 3.4, 3.5, and 4.0 modules.
1 parent 17738b7 commit 32b2f00

9 files changed

Lines changed: 369 additions & 42 deletions

File tree

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,68 @@ public void testRewriteSummary() throws Exception {
245245
EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("3.4"));
246246
}
247247

248+
@TestTemplate
249+
public void testRewritePositionDeletesWithArrayColumns() throws Exception {
250+
sql(
251+
"CREATE TABLE %s (id BIGINT, data STRING, items ARRAY<STRUCT<value:BIGINT, count:INT>>) "
252+
+ "USING iceberg TBLPROPERTIES "
253+
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')",
254+
tableName);
255+
256+
sql(
257+
"INSERT INTO %s VALUES "
258+
+ "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), "
259+
+ "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), "
260+
+ "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), "
261+
+ "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), "
262+
+ "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), "
263+
+ "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))",
264+
tableName);
265+
266+
sql("DELETE FROM %s WHERE id = 1", tableName);
267+
sql("DELETE FROM %s WHERE id = 2", tableName);
268+
269+
Table table = validationCatalog.loadTable(tableIdent);
270+
assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
271+
272+
sql(
273+
"CALL %s.system.rewrite_position_delete_files("
274+
+ "table => '%s',"
275+
+ "options => map('rewrite-all','true'))",
276+
catalogName, tableIdent);
277+
}
278+
279+
@TestTemplate
280+
public void testRewritePositionDeletesWithMapColumns() throws Exception {
281+
sql(
282+
"CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
283+
+ "USING iceberg TBLPROPERTIES "
284+
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')",
285+
tableName);
286+
287+
sql(
288+
"INSERT INTO %s VALUES "
289+
+ "(1, 'a', map('x', cast(10 as bigint))), "
290+
+ "(2, 'b', map('y', cast(20 as bigint))), "
291+
+ "(3, 'c', map('z', cast(30 as bigint))), "
292+
+ "(4, 'd', map('w', cast(40 as bigint))), "
293+
+ "(5, 'e', map('v', cast(50 as bigint))), "
294+
+ "(6, 'f', map('u', cast(60 as bigint)))",
295+
tableName);
296+
297+
sql("DELETE FROM %s WHERE id = 1", tableName);
298+
sql("DELETE FROM %s WHERE id = 2", tableName);
299+
300+
Table table = validationCatalog.loadTable(tableIdent);
301+
assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
302+
303+
sql(
304+
"CALL %s.system.rewrite_position_delete_files("
305+
+ "table => '%s',"
306+
+ "options => map('rewrite-all','true'))",
307+
catalogName, tableIdent);
308+
}
309+
248310
private Map<String, String> snapshotSummary() {
249311
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
250312
}

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.iceberg.spark.source;
2020

2121
import java.util.Map;
22-
import java.util.Set;
23-
import java.util.stream.Collectors;
2422
import java.util.stream.Stream;
2523
import org.apache.iceberg.ContentFile;
2624
import org.apache.iceberg.PositionDeletesScanTask;
@@ -32,7 +30,6 @@
3230
import org.apache.iceberg.io.CloseableIterator;
3331
import org.apache.iceberg.io.InputFile;
3432
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
35-
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
3633
import org.apache.iceberg.util.ContentFileUtil;
3734
import org.apache.iceberg.util.SnapshotUtil;
3835
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -88,12 +85,16 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
8885
InputFile inputFile = getInputFile(task.file().location());
8986
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with %s", task);
9087

91-
// select out constant fields when pushing down filter to row reader
88+
// Retain predicates on non-constant fields for row reader filter
9289
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
93-
Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
90+
int[] nonConstantFieldIds =
91+
expectedSchema().idToName().keySet().stream()
92+
.filter(id -> !idToConstant.containsKey(id))
93+
.mapToInt(Integer::intValue)
94+
.toArray();
9495
Expression residualWithoutConstants =
9596
ExpressionUtil.extractByIdInclusive(
96-
task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds));
97+
task.residual(), expectedSchema(), caseSensitive(), nonConstantFieldIds);
9798

9899
if (ContentFileUtil.isDV(task.file())) {
99100
return new DVIterator(inputFile, task.file(), expectedSchema(), idToConstant);
@@ -109,12 +110,4 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
109110
idToConstant)
110111
.iterator();
111112
}
112-
113-
private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
114-
Set<Integer> fields = expectedSchema().idToName().keySet();
115-
return fields.stream()
116-
.filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
117-
.filter(id -> !idToConstant.containsKey(id))
118-
.collect(Collectors.toSet());
119-
}
120113
}

spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,60 @@ public void testPartitionedTable() throws IOException {
209209
dropTable(tableName);
210210
}
211211

212+
@TestTemplate
213+
public void testArrayColumnFilter() throws IOException {
214+
assumeThat(formatVersion)
215+
.as("Row content in position_deletes is required for array column filter test")
216+
.isEqualTo(2);
217+
String tableName = "array_column_filter";
218+
Schema schemaWithArray =
219+
new Schema(
220+
Types.NestedField.required(1, "id", Types.IntegerType.get()),
221+
Types.NestedField.required(2, "data", Types.StringType.get()),
222+
Types.NestedField.optional(
223+
3, "arr_col", Types.ListType.ofOptional(4, Types.IntegerType.get())));
224+
Table tab = createTable(tableName, schemaWithArray, PartitionSpec.unpartitioned());
225+
226+
GenericRecord record1 = GenericRecord.create(schemaWithArray);
227+
record1.set(0, 1);
228+
record1.set(1, "a");
229+
record1.set(2, ImmutableList.of(1, 2));
230+
GenericRecord record2 = GenericRecord.create(schemaWithArray);
231+
record2.set(0, 2);
232+
record2.set(1, "b");
233+
record2.set(2, ImmutableList.of(3, 4));
234+
List<Record> dataRecords = ImmutableList.of(record1, record2);
235+
DataFile dFile =
236+
FileHelpers.writeDataFile(
237+
tab,
238+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
239+
TestHelpers.Row.of(),
240+
dataRecords);
241+
tab.newAppend().appendFile(dFile).commit();
242+
243+
List<PositionDelete<?>> deletes =
244+
ImmutableList.of(
245+
positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a", ImmutableList.of(1, 2)),
246+
positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b", ImmutableList.of(3, 4)));
247+
DeleteFile posDeletes =
248+
FileHelpers.writePosDeleteFile(
249+
tab,
250+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
251+
TestHelpers.Row.of(),
252+
deletes,
253+
formatVersion);
254+
tab.newRowDelta().addDeletes(posDeletes).commit();
255+
256+
// Filter directly on array column: row.arr_col = array(1, 2)
257+
StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
258+
StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)), null, posDeletes);
259+
260+
assertThat(actual)
261+
.as("Filtering position_deletes by row.arr_col = array(1, 2) should return matching row")
262+
.isEqualTo(expected);
263+
dropTable(tableName);
264+
}
265+
212266
@TestTemplate
213267
public void testSelect() throws IOException {
214268
assumeThat(formatVersion).as("DVs don't have row info in PositionDeletesTable").isEqualTo(2);

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,68 @@ public void testRewriteSummary() throws Exception {
245245
EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("3.5"));
246246
}
247247

248+
@TestTemplate
249+
public void testRewritePositionDeletesWithArrayColumns() throws Exception {
250+
sql(
251+
"CREATE TABLE %s (id BIGINT, data STRING, items ARRAY<STRUCT<value:BIGINT, count:INT>>) "
252+
+ "USING iceberg TBLPROPERTIES "
253+
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')",
254+
tableName);
255+
256+
sql(
257+
"INSERT INTO %s VALUES "
258+
+ "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), "
259+
+ "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), "
260+
+ "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), "
261+
+ "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), "
262+
+ "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), "
263+
+ "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))",
264+
tableName);
265+
266+
sql("DELETE FROM %s WHERE id = 1", tableName);
267+
sql("DELETE FROM %s WHERE id = 2", tableName);
268+
269+
Table table = validationCatalog.loadTable(tableIdent);
270+
assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
271+
272+
sql(
273+
"CALL %s.system.rewrite_position_delete_files("
274+
+ "table => '%s',"
275+
+ "options => map('rewrite-all','true'))",
276+
catalogName, tableIdent);
277+
}
278+
279+
@TestTemplate
280+
public void testRewritePositionDeletesWithMapColumns() throws Exception {
281+
sql(
282+
"CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
283+
+ "USING iceberg TBLPROPERTIES "
284+
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')",
285+
tableName);
286+
287+
sql(
288+
"INSERT INTO %s VALUES "
289+
+ "(1, 'a', map('x', cast(10 as bigint))), "
290+
+ "(2, 'b', map('y', cast(20 as bigint))), "
291+
+ "(3, 'c', map('z', cast(30 as bigint))), "
292+
+ "(4, 'd', map('w', cast(40 as bigint))), "
293+
+ "(5, 'e', map('v', cast(50 as bigint))), "
294+
+ "(6, 'f', map('u', cast(60 as bigint)))",
295+
tableName);
296+
297+
sql("DELETE FROM %s WHERE id = 1", tableName);
298+
sql("DELETE FROM %s WHERE id = 2", tableName);
299+
300+
Table table = validationCatalog.loadTable(tableIdent);
301+
assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);
302+
303+
sql(
304+
"CALL %s.system.rewrite_position_delete_files("
305+
+ "table => '%s',"
306+
+ "options => map('rewrite-all','true'))",
307+
catalogName, tableIdent);
308+
}
309+
248310
private Map<String, String> snapshotSummary() {
249311
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
250312
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.iceberg.spark.source;
2020

2121
import java.util.Map;
22-
import java.util.Set;
23-
import java.util.stream.Collectors;
2422
import java.util.stream.Stream;
2523
import org.apache.iceberg.ContentFile;
2624
import org.apache.iceberg.PositionDeletesScanTask;
@@ -33,7 +31,6 @@
3331
import org.apache.iceberg.io.FileIO;
3432
import org.apache.iceberg.io.InputFile;
3533
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
36-
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
3734
import org.apache.iceberg.util.ContentFileUtil;
3835
import org.apache.iceberg.util.SnapshotUtil;
3936
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -96,12 +93,16 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
9693
InputFile inputFile = getInputFile(task.file().location());
9794
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with %s", task);
9895

99-
// select out constant fields when pushing down filter to row reader
96+
// Retain predicates on non-constant fields for row reader filter
10097
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
101-
Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
98+
int[] nonConstantFieldIds =
99+
expectedSchema().idToName().keySet().stream()
100+
.filter(id -> !idToConstant.containsKey(id))
101+
.mapToInt(Integer::intValue)
102+
.toArray();
102103
Expression residualWithoutConstants =
103104
ExpressionUtil.extractByIdInclusive(
104-
task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds));
105+
task.residual(), expectedSchema(), caseSensitive(), nonConstantFieldIds);
105106

106107
if (ContentFileUtil.isDV(task.file())) {
107108
return new DVIterator(inputFile, task.file(), expectedSchema(), idToConstant);
@@ -117,12 +118,4 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
117118
idToConstant)
118119
.iterator();
119120
}
120-
121-
private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
122-
Set<Integer> fields = expectedSchema().idToName().keySet();
123-
return fields.stream()
124-
.filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
125-
.filter(id -> !idToConstant.containsKey(id))
126-
.collect(Collectors.toSet());
127-
}
128121
}

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,60 @@ public void testPartitionedTable() throws IOException {
209209
dropTable(tableName);
210210
}
211211

212+
@TestTemplate
213+
public void testArrayColumnFilter() throws IOException {
214+
assumeThat(formatVersion)
215+
.as("Row content in position_deletes is required for array column filter test")
216+
.isEqualTo(2);
217+
String tableName = "array_column_filter";
218+
Schema schemaWithArray =
219+
new Schema(
220+
Types.NestedField.required(1, "id", Types.IntegerType.get()),
221+
Types.NestedField.required(2, "data", Types.StringType.get()),
222+
Types.NestedField.optional(
223+
3, "arr_col", Types.ListType.ofOptional(4, Types.IntegerType.get())));
224+
Table tab = createTable(tableName, schemaWithArray, PartitionSpec.unpartitioned());
225+
226+
GenericRecord record1 = GenericRecord.create(schemaWithArray);
227+
record1.set(0, 1);
228+
record1.set(1, "a");
229+
record1.set(2, ImmutableList.of(1, 2));
230+
GenericRecord record2 = GenericRecord.create(schemaWithArray);
231+
record2.set(0, 2);
232+
record2.set(1, "b");
233+
record2.set(2, ImmutableList.of(3, 4));
234+
List<Record> dataRecords = ImmutableList.of(record1, record2);
235+
DataFile dFile =
236+
FileHelpers.writeDataFile(
237+
tab,
238+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
239+
TestHelpers.Row.of(),
240+
dataRecords);
241+
tab.newAppend().appendFile(dFile).commit();
242+
243+
List<PositionDelete<?>> deletes =
244+
ImmutableList.of(
245+
positionDelete(schemaWithArray, dFile.location(), 0L, 1, "a", ImmutableList.of(1, 2)),
246+
positionDelete(schemaWithArray, dFile.location(), 1L, 2, "b", ImmutableList.of(3, 4)));
247+
DeleteFile posDeletes =
248+
FileHelpers.writePosDeleteFile(
249+
tab,
250+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
251+
TestHelpers.Row.of(),
252+
deletes,
253+
formatVersion);
254+
tab.newRowDelta().addDeletes(posDeletes).commit();
255+
256+
// Filter directly on array column: row.arr_col = array(1, 2)
257+
StructLikeSet actual = actual(tableName, tab, "row.arr_col = array(1, 2)");
258+
StructLikeSet expected = expected(tab, ImmutableList.of(deletes.get(0)), null, posDeletes);
259+
260+
assertThat(actual)
261+
.as("Filtering position_deletes by row.arr_col = array(1, 2) should return matching row")
262+
.isEqualTo(expected);
263+
dropTable(tableName);
264+
}
265+
212266
@TestTemplate
213267
public void testSelect() throws IOException {
214268
assumeThat(formatVersion).as("DVs don't have row info in PositionDeletesTable").isEqualTo(2);

0 commit comments

Comments
 (0)