diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md
index bc0c1dbd0f73..747a3179d754 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -294,6 +294,26 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
Gauge |
The average total file size of all active (currently being written) buckets. |
+
+ | maxSortBufferUsedBytes |
+ Gauge |
+ The maximum sort buffer memory currently used across all active compaction buckets, in bytes. High values relative to maxSortBufferTotalBytes indicate memory pressure during compaction; consider lowering sort-spill-threshold or reducing sort-spill-buffer-size. |
+
+
+ | avgSortBufferUsedBytes |
+ Gauge |
+ The average sort buffer memory used across all active compaction buckets, in bytes. |
+
+
+ | maxSortBufferUtilisationPercent |
+ Gauge |
+ The maximum sort buffer utilisation percentage (0–100) across all active compaction buckets. A value consistently near 100 indicates the sort buffer pool is exhausted and spilling to disk is occurring or imminent. |
+
+
+ | avgSortBufferUtilisationPercent |
+ Gauge |
+ The average sort buffer utilisation percentage across all active compaction buckets. |
+
diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
index cec1db858115..5033361fe3fa 100644
--- a/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
@@ -58,4 +58,8 @@ public void returnAll(List memory) {
public int freePages() {
return maxPages - numPage;
}
+
+ public int maxPages() {
+ return maxPages;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index ae6db76b90ba..705ef5119f07 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -66,7 +66,7 @@ public class MergeSorter {
private final int spillThreshold;
private final CompressOptions compression;
- private final MemorySegmentPool memoryPool;
+ private final CachelessSegmentPool memoryPool;
@Nullable private IOManager ioManager;
@@ -85,6 +85,14 @@ public MergeSorter(
this.ioManager = ioManager;
}
+ public long sortBufferTotalBytes() {
+ return (long) memoryPool.maxPages() * memoryPool.pageSize();
+ }
+
+ public long sortBufferUsedBytes() {
+ return (long) (memoryPool.maxPages() - memoryPool.freePages()) * memoryPool.pageSize();
+ }
+
public MemorySegmentPool memoryPool() {
return memoryPool;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
index 4ad5c0ac79f6..4c393c3815d0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
@@ -156,7 +156,7 @@ public CompactManager create(
Comparator keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();
- CompactRewriter rewriter =
+ MergeTreeCompactRewriter rewriter =
createRewriter(
partition,
bucket,
@@ -164,6 +164,13 @@ public CompactManager create(
userDefinedSeqComparator,
levels,
dvMaintainer);
+ CompactionMetrics.Reporter metricsReporter =
+ compactionMetrics == null
+ ? null
+ : compactionMetrics.createReporter(partition, bucket);
+ if (metricsReporter != null) {
+ rewriter.setMetricsReporter(metricsReporter);
+ }
return new MergeTreeCompactManager(
compactExecutor,
levels,
@@ -172,9 +179,7 @@ public CompactManager create(
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
- compactionMetrics == null
- ? null
- : compactionMetrics.createReporter(partition, bucket),
+ metricsReporter,
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index f23582618789..9eecce5248c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -30,6 +30,7 @@
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.utils.ExceptionUtils;
@@ -51,6 +52,7 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory mfFactory;
protected final MergeSorter mergeSorter;
+ @Nullable private CompactionMetrics.Reporter metricsReporter;
public MergeTreeCompactRewriter(
FileReaderFactory readerFactory,
@@ -106,6 +108,10 @@ protected CompactResult rewriteCompaction(
notifyRewriteCompactBefore(before);
List after = writer.result();
after = notifyRewriteCompactAfter(after);
+ if (metricsReporter != null) {
+ metricsReporter.reportSortBufferMetrics(
+ mergeSorter.sortBufferUsedBytes(), mergeSorter.sortBufferTotalBytes());
+ }
return new CompactResult(before, after);
}
@@ -126,4 +132,8 @@ protected void notifyRewriteCompactBefore(List files) {}
protected List notifyRewriteCompactAfter(List files) {
return files;
}
+
+ public void setMetricsReporter(@Nullable CompactionMetrics.Reporter reporter) {
+ this.metricsReporter = reporter;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index e91d55f1bcc7..2a98e30d3ec0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -52,6 +52,11 @@ public class CompactionMetrics {
public static final String MAX_TOTAL_FILE_SIZE = "maxTotalFileSize";
public static final String AVG_TOTAL_FILE_SIZE = "avgTotalFileSize";
+ public static final String MAX_SORT_BUFFER_USED_BYTES = "maxSortBufferUsedBytes";
+ public static final String AVG_SORT_BUFFER_USED_BYTES = "avgSortBufferUsedBytes";
+ public static final String MAX_SORT_BUFFER_UTILISATION = "maxSortBufferUtilisationPercent";
+ public static final String AVG_SORT_BUFFER_UTILISATION = "avgSortBufferUtilisationPercent";
+
private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;
@@ -102,6 +107,18 @@ private void registerGenericCompactionMetrics() {
metricGroup.gauge(MAX_TOTAL_FILE_SIZE, () -> getTotalFileSizeStream().max().orElse(-1));
metricGroup.gauge(AVG_TOTAL_FILE_SIZE, () -> getTotalFileSizeStream().average().orElse(-1));
+
+ metricGroup.gauge(
+ MAX_SORT_BUFFER_USED_BYTES, () -> getSortBufferUsedBytesStream().max().orElse(-1));
+ metricGroup.gauge(
+ AVG_SORT_BUFFER_USED_BYTES,
+ () -> getSortBufferUsedBytesStream().average().orElse(-1));
+ metricGroup.gauge(
+ MAX_SORT_BUFFER_UTILISATION,
+ () -> getSortBufferUtilisationStream().max().orElse(-1));
+ metricGroup.gauge(
+ AVG_SORT_BUFFER_UTILISATION,
+ () -> getSortBufferUtilisationStream().average().orElse(-1));
}
private LongStream getLevel0FileCountStream() {
@@ -130,6 +147,14 @@ public LongStream getTotalFileSizeStream() {
return reporters.values().stream().mapToLong(r -> r.totalFileSize);
}
+ private LongStream getSortBufferUsedBytesStream() {
+ return reporters.values().stream().mapToLong(r -> r.sortBufferUsedBytes);
+ }
+
+ private DoubleStream getSortBufferUtilisationStream() {
+ return reporters.values().stream().mapToDouble(r -> r.sortBufferUtilisationPercent);
+ }
+
public void close() {
metricGroup.close();
}
@@ -157,6 +182,8 @@ public interface Reporter {
void reportTotalFileSize(long bytes);
+ void reportSortBufferMetrics(long usedBytes, long totalBytes);
+
void unregister();
}
@@ -167,6 +194,8 @@ private class ReporterImpl implements Reporter {
private long compactionInputSize = 0;
private long compactionOutputSize = 0;
private long totalFileSize = 0;
+ private long sortBufferUsedBytes = 0;
+ private double sortBufferUtilisationPercent = 0.0;
private ReporterImpl(PartitionAndBucket key) {
this.key = key;
@@ -210,6 +239,13 @@ public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
}
+ @Override
+ public void reportSortBufferMetrics(long usedBytes, long totalBytes) {
+ this.sortBufferUsedBytes = usedBytes;
+ this.sortBufferUtilisationPercent =
+ totalBytes > 0 ? 100.0 * usedBytes / totalBytes : 0.0;
+ }
+
@Override
public void increaseCompactionsCompletedCount() {
compactionsCompletedCounter.inc();
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
index 91d4c0acbc2b..df7265f07e80 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -57,6 +57,49 @@ public class CompactionMetricsTest {
@TempDir java.nio.file.Path tempDir;
+ @Test
+ public void testSortBufferMetrics() {
+ CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable");
+
+ // no reporters yet: gauges return -1 (no data)
+ assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_USED_BYTES)).isEqualTo(-1L);
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_USED_BYTES))
+ .isEqualTo(-1.0);
+ assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
+ .isEqualTo(-1.0);
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION))
+ .isEqualTo(-1.0);
+
+ CompactionMetrics.Reporter r0 = metrics.createReporter(BinaryRow.EMPTY_ROW, 0);
+ CompactionMetrics.Reporter r1 = metrics.createReporter(BinaryRow.EMPTY_ROW, 1);
+
+ // bucket 0: 32 MB used of 64 MB total = 50% utilisation
+ r0.reportSortBufferMetrics(32L * 1024 * 1024, 64L * 1024 * 1024);
+ // bucket 1: 48 MB used of 64 MB total = 75% utilisation
+ r1.reportSortBufferMetrics(48L * 1024 * 1024, 64L * 1024 * 1024);
+
+ assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_USED_BYTES))
+ .isEqualTo(48L * 1024 * 1024);
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_USED_BYTES))
+ .isEqualTo(40.0 * 1024 * 1024);
+ assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
+ .isEqualTo(75.0);
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION))
+ .isEqualTo(62.5);
+
+ // update bucket 0 to full utilisation
+ r0.reportSortBufferMetrics(64L * 1024 * 1024, 64L * 1024 * 1024);
+ assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
+ .isEqualTo(100.0);
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION))
+ .isEqualTo(87.5);
+
+ // zero-total pool reports 0% utilisation (no division by zero)
+ r0.reportSortBufferMetrics(0L, 0L);
+ assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
+ .isEqualTo(75.0);
+ }
+
@Test
public void testReportMetrics() {
CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable");