diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md index bc0c1dbd0f73..747a3179d754 100644 --- a/docs/content/maintenance/metrics.md +++ b/docs/content/maintenance/metrics.md @@ -294,6 +294,26 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca Gauge The average total file size of all active (currently being written) buckets. + + maxSortBufferUsedBytes + Gauge + The maximum sort buffer memory currently used across all active compaction buckets, in bytes. High values relative to maxSortBufferTotalBytes indicate memory pressure during compaction; consider lowering sort-spill-threshold or reducing sort-spill-buffer-size. + + + avgSortBufferUsedBytes + Gauge + The average sort buffer memory used across all active compaction buckets, in bytes. + + + maxSortBufferUtilisationPercent + Gauge + The maximum sort buffer utilisation percentage (0–100) across all active compaction buckets. A value consistently near 100 indicates the sort buffer pool is exhausted and spilling to disk is occurring or imminent. + + + avgSortBufferUtilisationPercent + Gauge + The average sort buffer utilisation percentage across all active compaction buckets. + diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java index cec1db858115..5033361fe3fa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java @@ -58,4 +58,8 @@ public void returnAll(List memory) { public int freePages() { return maxPages - numPage; } + + public int maxPages() { + return maxPages; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java index ae6db76b90ba..705ef5119f07 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java @@ -66,7 +66,7 @@ public class MergeSorter { private final int spillThreshold; private final CompressOptions compression; - private final MemorySegmentPool memoryPool; + private final CachelessSegmentPool memoryPool; @Nullable private IOManager ioManager; @@ -85,6 +85,14 @@ public MergeSorter( this.ioManager = ioManager; } + public long sortBufferTotalBytes() { + return (long) memoryPool.maxPages() * memoryPool.pageSize(); + } + + public long sortBufferUsedBytes() { + return (long) (memoryPool.maxPages() - memoryPool.freePages()) * memoryPool.pageSize(); + } + public MemorySegmentPool memoryPool() { return memoryPool; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java index 4ad5c0ac79f6..4c393c3815d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java @@ -156,7 +156,7 @@ public CompactManager create( Comparator keyComparator = keyComparatorSupplier.get(); Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels()); @Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get(); - CompactRewriter rewriter = + MergeTreeCompactRewriter rewriter = createRewriter( partition, bucket, @@ -164,6 +164,13 @@ public CompactManager create( userDefinedSeqComparator, levels, dvMaintainer); + CompactionMetrics.Reporter metricsReporter = + compactionMetrics == null + ? null + : compactionMetrics.createReporter(partition, bucket); + if (metricsReporter != null) { + rewriter.setMetricsReporter(metricsReporter); + } return new MergeTreeCompactManager( compactExecutor, levels, @@ -172,9 +179,7 @@ public CompactManager create( options.compactionFileSize(true), options.numSortedRunStopTrigger(), rewriter, - compactionMetrics == null - ? null - : compactionMetrics.createReporter(partition, bucket), + metricsReporter, dvMaintainer, options.prepareCommitWaitCompaction(), options.needLookup(), diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index f23582618789..9eecce5248c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -30,6 +30,7 @@ import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.utils.ExceptionUtils; @@ -51,6 +52,7 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter { @Nullable protected final FieldsComparator userDefinedSeqComparator; protected final MergeFunctionFactory mfFactory; protected final MergeSorter mergeSorter; + @Nullable private CompactionMetrics.Reporter metricsReporter; public MergeTreeCompactRewriter( FileReaderFactory readerFactory, @@ -106,6 +108,10 @@ protected CompactResult rewriteCompaction( notifyRewriteCompactBefore(before); List after = writer.result(); after = notifyRewriteCompactAfter(after); + if (metricsReporter != null) { + metricsReporter.reportSortBufferMetrics( + mergeSorter.sortBufferUsedBytes(), mergeSorter.sortBufferTotalBytes()); + } return new CompactResult(before, after); } @@ -126,4 +132,8 @@ protected void notifyRewriteCompactBefore(List files) {} protected List notifyRewriteCompactAfter(List files) { return files; } + + public void setMetricsReporter(@Nullable CompactionMetrics.Reporter reporter) { + this.metricsReporter = reporter; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java index e91d55f1bcc7..2a98e30d3ec0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java @@ -52,6 +52,11 @@ public class CompactionMetrics { public static final String MAX_TOTAL_FILE_SIZE = "maxTotalFileSize"; public static final String AVG_TOTAL_FILE_SIZE = "avgTotalFileSize"; + public static final String MAX_SORT_BUFFER_USED_BYTES = "maxSortBufferUsedBytes"; + public static final String AVG_SORT_BUFFER_USED_BYTES = "avgSortBufferUsedBytes"; + public static final String MAX_SORT_BUFFER_UTILISATION = "maxSortBufferUtilisationPercent"; + public static final String AVG_SORT_BUFFER_UTILISATION = "avgSortBufferUtilisationPercent"; + private static final long BUSY_MEASURE_MILLIS = 60_000; private static final int COMPACTION_TIME_WINDOW = 100; @@ -102,6 +107,18 @@ private void registerGenericCompactionMetrics() { metricGroup.gauge(MAX_TOTAL_FILE_SIZE, () -> getTotalFileSizeStream().max().orElse(-1)); metricGroup.gauge(AVG_TOTAL_FILE_SIZE, () -> getTotalFileSizeStream().average().orElse(-1)); + + metricGroup.gauge( + MAX_SORT_BUFFER_USED_BYTES, () -> getSortBufferUsedBytesStream().max().orElse(-1)); + metricGroup.gauge( + AVG_SORT_BUFFER_USED_BYTES, + () -> getSortBufferUsedBytesStream().average().orElse(-1)); + metricGroup.gauge( + MAX_SORT_BUFFER_UTILISATION, + () -> getSortBufferUtilisationStream().max().orElse(-1)); + metricGroup.gauge( + AVG_SORT_BUFFER_UTILISATION, + () -> getSortBufferUtilisationStream().average().orElse(-1)); } private LongStream getLevel0FileCountStream() { @@ -130,6 +147,14 @@ public LongStream getTotalFileSizeStream() { return reporters.values().stream().mapToLong(r -> r.totalFileSize); } + private LongStream getSortBufferUsedBytesStream() { + return reporters.values().stream().mapToLong(r -> r.sortBufferUsedBytes); + } + + private DoubleStream getSortBufferUtilisationStream() { + return reporters.values().stream().mapToDouble(r -> r.sortBufferUtilisationPercent); + } + public void close() { metricGroup.close(); } @@ -157,6 +182,8 @@ public interface Reporter { void reportTotalFileSize(long bytes); + void reportSortBufferMetrics(long usedBytes, long totalBytes); + void unregister(); } @@ -167,6 +194,8 @@ private class ReporterImpl implements Reporter { private long compactionInputSize = 0; private long compactionOutputSize = 0; private long totalFileSize = 0; + private long sortBufferUsedBytes = 0; + private double sortBufferUtilisationPercent = 0.0; private ReporterImpl(PartitionAndBucket key) { this.key = key; @@ -210,6 +239,13 @@ public void reportLevel0FileCount(long count) { this.level0FileCount = count; } + @Override + public void reportSortBufferMetrics(long usedBytes, long totalBytes) { + this.sortBufferUsedBytes = usedBytes; + this.sortBufferUtilisationPercent = + totalBytes > 0 ? 100.0 * usedBytes / totalBytes : 0.0; + } + @Override public void increaseCompactionsCompletedCount() { compactionsCompletedCounter.inc(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java index 91d4c0acbc2b..df7265f07e80 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java @@ -57,6 +57,49 @@ public class CompactionMetricsTest { @TempDir java.nio.file.Path tempDir; + @Test + public void testSortBufferMetrics() { + CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable"); + + // no reporters yet: gauges return -1 (no data) + assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_USED_BYTES)).isEqualTo(-1L); + assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_USED_BYTES)) + .isEqualTo(-1.0); + assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION)) + .isEqualTo(-1.0); + assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION)) + .isEqualTo(-1.0); + + CompactionMetrics.Reporter r0 = metrics.createReporter(BinaryRow.EMPTY_ROW, 0); + CompactionMetrics.Reporter r1 = metrics.createReporter(BinaryRow.EMPTY_ROW, 1); + + // bucket 0: 32 MB used of 64 MB total = 50% utilisation + r0.reportSortBufferMetrics(32L * 1024 * 1024, 64L * 1024 * 1024); + // bucket 1: 48 MB used of 64 MB total = 75% utilisation + r1.reportSortBufferMetrics(48L * 1024 * 1024, 64L * 1024 * 1024); + + assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_USED_BYTES)) + .isEqualTo(48L * 1024 * 1024); + assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_USED_BYTES)) + .isEqualTo(40.0 * 1024 * 1024); + assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION)) + .isEqualTo(75.0); + assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION)) + .isEqualTo(62.5); + + // update bucket 0 to full utilisation + r0.reportSortBufferMetrics(64L * 1024 * 1024, 64L * 1024 * 1024); + assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION)) + .isEqualTo(100.0); + assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION)) + .isEqualTo(87.5); + + // zero-total pool reports 0% utilisation (no division by zero) + r0.reportSortBufferMetrics(0L, 0L); + assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION)) + .isEqualTo(75.0); + } + @Test public void testReportMetrics() { CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable");