Skip to content

Commit 1852d2d

Browse files
authored
Core: Load snapshot after it has been committed to prevent accidental cleanup of files (#15511) (#15650)
1 parent 58e86c7 commit 1852d2d

3 files changed

Lines changed: 94 additions & 18 deletions

File tree

core/src/main/java/org/apache/iceberg/SnapshotProducer.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import java.util.UUID;
4545
import java.util.concurrent.ExecutorService;
4646
import java.util.concurrent.atomic.AtomicInteger;
47-
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.concurrent.atomic.AtomicLong;
4848
import java.util.concurrent.atomic.AtomicReferenceArray;
4949
import java.util.function.Consumer;
5050
import java.util.function.Function;
@@ -422,8 +422,8 @@ protected TableMetadata refresh() {
422422
@Override
423423
@SuppressWarnings("checkstyle:CyclomaticComplexity")
424424
public void commit() {
425-
// this is always set to the latest commit attempt's snapshot
426-
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
425+
// this is always set to the latest commit attempt's snapshot id.
426+
AtomicLong newSnapshotId = new AtomicLong(-1L);
427427
try (Timed ignore = commitMetrics().totalDuration().start()) {
428428
try {
429429
Tasks.foreach(ops)
@@ -438,7 +438,7 @@ public void commit() {
438438
.run(
439439
taskOps -> {
440440
Snapshot newSnapshot = apply();
441-
stagedSnapshot.set(newSnapshot);
441+
newSnapshotId.set(newSnapshot.snapshotId());
442442
TableMetadata.Builder update = TableMetadata.buildFrom(base);
443443
if (base.snapshot(newSnapshot.snapshotId()) != null) {
444444
// this is a rollback operation
@@ -476,22 +476,29 @@ public void commit() {
476476
throw e;
477477
}
478478

479-
// at this point, the commit must have succeeded so the stagedSnapshot is committed
480-
Snapshot committedSnapshot = stagedSnapshot.get();
481479
try {
482-
LOG.info(
483-
"Committed snapshot {} ({})",
484-
committedSnapshot.snapshotId(),
485-
getClass().getSimpleName());
480+
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());
481+
482+
// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
483+
// id in case another commit was added between this commit and the refresh.
484+
// it might not be known which commit attempt succeeded in some cases, so this only cleans
485+
// up the one that actually did succeed.
486+
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
487+
if (saved != null) {
488+
if (cleanupAfterCommit()) {
489+
cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
490+
}
486491

487-
if (cleanupAfterCommit()) {
488-
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
489-
}
490-
// also clean up unused manifest lists created by multiple attempts
491-
for (String manifestList : manifestLists) {
492-
if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
493-
deleteFile(manifestList);
492+
// also clean up unused manifest lists created by multiple attempts
493+
for (String manifestList : manifestLists) {
494+
if (!saved.manifestListLocation().equals(manifestList)) {
495+
deleteFile(manifestList);
496+
}
494497
}
498+
} else {
499+
// saved may not be present if the latest metadata couldn't be loaded due to eventual
500+
// consistency problems in refresh. in that case, don't clean up.
501+
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
495502
}
496503
} catch (Throwable e) {
497504
LOG.warn(

core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,15 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import static org.apache.iceberg.TestBase.FILE_A;
22+
import static org.apache.iceberg.TestBase.SCHEMA;
23+
import static org.apache.iceberg.TestBase.SPEC;
2124
import static org.assertj.core.api.Assertions.assertThat;
2225

26+
import java.io.File;
27+
import java.nio.file.Paths;
2328
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.io.TempDir;
2430

2531
public class TestSnapshotProducer {
2632

@@ -74,4 +80,67 @@ private void assertManifestWriterCount(
7480
int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, fileCount);
7581
assertThat(writerCount).as(errMsg).isEqualTo(expectedManifestWriterCount);
7682
}
83+
84+
@Test
85+
public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit(@TempDir File tableDir) {
86+
// Uses a custom TableOps that returns stale metadata (without the new snapshot) on the
87+
// first refresh() after commit, simulating eventual consistency. Verifies that commit succeeds
88+
// and that the committed data is visible once the table is refreshed again
89+
String tableName = "stale-table-on-first-refresh";
90+
TestTables.TestTableOperations ops = opsWithStaleRefreshAfterCommit(tableName, tableDir);
91+
TestTables.TestTable tableWithStaleRefresh =
92+
TestTables.create(tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 2, ops);
93+
94+
// the first refresh() after the commit will return stale metadata (without this snapshot), so
95+
// SnapshotProducer will skip cleanup to avoid accidentally deleting files that are part of the
96+
// committed snapshot but commit still succeeds
97+
tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit();
98+
99+
// Refresh again to get the real metadata; the snapshot must be visible now
100+
tableWithStaleRefresh.ops().refresh();
101+
Snapshot snapshot = tableWithStaleRefresh.currentSnapshot();
102+
assertThat(snapshot)
103+
.as("Committed snapshot must be visible after refresh (eventual consistency resolved)")
104+
.isNotNull();
105+
106+
File metadata = Paths.get(tableDir.getPath(), "metadata").toFile();
107+
assertThat(snapshot.allManifests(tableWithStaleRefresh.io()))
108+
.isNotEmpty()
109+
.allSatisfy(
110+
manifest -> assertThat(metadata.listFiles()).contains(new File(manifest.path())));
111+
}
112+
113+
/**
114+
* Creates a TableOperations that returns stale metadata (without the newly committed snapshot) on
115+
* the first refresh() after a commit. This simulates eventual consistency where the committed
116+
* snapshot is not yet visible. Used to verify that when the snapshot cannot be loaded after
117+
* commit, cleanup is skipped to avoid accidentally deleting files that are part of the committed
118+
* snapshot.
119+
*/
120+
private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit(
121+
String name, File location) {
122+
return new TestTables.TestTableOperations(name, location) {
123+
private TableMetadata metadataToReturnOnNextRefresh;
124+
125+
@Override
126+
public void commit(TableMetadata base, TableMetadata updatedMetadata) {
127+
super.commit(base, updatedMetadata);
128+
if (base != null) {
129+
// return stale metadata on the first refresh() call
130+
this.metadataToReturnOnNextRefresh = base;
131+
}
132+
}
133+
134+
@Override
135+
public TableMetadata refresh() {
136+
if (metadataToReturnOnNextRefresh != null) {
137+
this.current = metadataToReturnOnNextRefresh;
138+
this.metadataToReturnOnNextRefresh = null;
139+
return current;
140+
}
141+
142+
return super.refresh();
143+
}
144+
};
145+
}
77146
}

core/src/test/java/org/apache/iceberg/TestTables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public static class TestTableOperations implements TableOperations {
276276
private final String tableName;
277277
private final File metadata;
278278
private final FileIO fileIO;
279-
private TableMetadata current = null;
279+
protected TableMetadata current = null;
280280
private long lastSnapshotId = 0;
281281
private int failCommits = 0;
282282

0 commit comments

Comments
 (0)