Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,26 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
<td>Gauge</td>
<td>The average total file size of all active (currently being written) buckets.</td>
</tr>
<tr>
<td>maxSortBufferUsedBytes</td>
<td>Gauge</td>
<td>The maximum sort buffer memory currently used across all active compaction buckets, in bytes. High values relative to <code>maxSortBufferTotalBytes</code> indicate memory pressure during compaction; consider lowering <code>sort-spill-threshold</code> or reducing <code>sort-spill-buffer-size</code>.</td>
</tr>
<tr>
<td>avgSortBufferUsedBytes</td>
<td>Gauge</td>
<td>The average sort buffer memory used across all active compaction buckets, in bytes.</td>
</tr>
<tr>
<td>maxSortBufferUtilisationPercent</td>
<td>Gauge</td>
<td>The maximum sort buffer utilisation percentage (0–100) across all active compaction buckets. A value consistently near 100 indicates the sort buffer pool is exhausted and spilling to disk is occurring or imminent.</td>
</tr>
<tr>
<td>avgSortBufferUtilisationPercent</td>
<td>Gauge</td>
<td>The average sort buffer utilisation percentage across all active compaction buckets.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public void returnAll(List<MemorySegment> memory) {
public int freePages() {
return maxPages - numPage;
}

public int maxPages() {
return maxPages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class MergeSorter {
private final int spillThreshold;
private final CompressOptions compression;

private final MemorySegmentPool memoryPool;
private final CachelessSegmentPool memoryPool;

@Nullable private IOManager ioManager;

Expand All @@ -85,6 +85,14 @@ public MergeSorter(
this.ioManager = ioManager;
}

public long sortBufferTotalBytes() {
return (long) memoryPool.maxPages() * memoryPool.pageSize();
}

public long sortBufferUsedBytes() {
return (long) (memoryPool.maxPages() - memoryPool.freePages()) * memoryPool.pageSize();
}

public MemorySegmentPool memoryPool() {
return memoryPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,21 @@ public CompactManager create(
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();
CompactRewriter rewriter =
MergeTreeCompactRewriter rewriter =
createRewriter(
partition,
bucket,
keyComparator,
userDefinedSeqComparator,
levels,
dvMaintainer);
CompactionMetrics.Reporter metricsReporter =
compactionMetrics == null
? null
: compactionMetrics.createReporter(partition, bucket);
if (metricsReporter != null) {
rewriter.setMetricsReporter(metricsReporter);
}
return new MergeTreeCompactManager(
compactExecutor,
levels,
Expand All @@ -172,9 +179,7 @@ public CompactManager create(
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
compactionMetrics == null
? null
: compactionMetrics.createReporter(partition, bucket),
metricsReporter,
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.utils.ExceptionUtils;
Expand All @@ -51,6 +52,7 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
protected final MergeSorter mergeSorter;
@Nullable private CompactionMetrics.Reporter metricsReporter;

public MergeTreeCompactRewriter(
FileReaderFactory<KeyValue> readerFactory,
Expand Down Expand Up @@ -106,6 +108,10 @@ protected CompactResult rewriteCompaction(
notifyRewriteCompactBefore(before);
List<DataFileMeta> after = writer.result();
after = notifyRewriteCompactAfter(after);
if (metricsReporter != null) {
metricsReporter.reportSortBufferMetrics(
mergeSorter.sortBufferUsedBytes(), mergeSorter.sortBufferTotalBytes());
}
return new CompactResult(before, after);
}

Expand All @@ -126,4 +132,8 @@ protected void notifyRewriteCompactBefore(List<DataFileMeta> files) {}
protected List<DataFileMeta> notifyRewriteCompactAfter(List<DataFileMeta> files) {
return files;
}

public void setMetricsReporter(@Nullable CompactionMetrics.Reporter reporter) {
this.metricsReporter = reporter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public class CompactionMetrics {
public static final String MAX_TOTAL_FILE_SIZE = "maxTotalFileSize";
public static final String AVG_TOTAL_FILE_SIZE = "avgTotalFileSize";

public static final String MAX_SORT_BUFFER_USED_BYTES = "maxSortBufferUsedBytes";
public static final String AVG_SORT_BUFFER_USED_BYTES = "avgSortBufferUsedBytes";
public static final String MAX_SORT_BUFFER_UTILISATION = "maxSortBufferUtilisationPercent";
public static final String AVG_SORT_BUFFER_UTILISATION = "avgSortBufferUtilisationPercent";

private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;

Expand Down Expand Up @@ -102,6 +107,18 @@ private void registerGenericCompactionMetrics() {

metricGroup.gauge(MAX_TOTAL_FILE_SIZE, () -> getTotalFileSizeStream().max().orElse(-1));
metricGroup.gauge(AVG_TOTAL_FILE_SIZE, () -> getTotalFileSizeStream().average().orElse(-1));

metricGroup.gauge(
MAX_SORT_BUFFER_USED_BYTES, () -> getSortBufferUsedBytesStream().max().orElse(-1));
metricGroup.gauge(
AVG_SORT_BUFFER_USED_BYTES,
() -> getSortBufferUsedBytesStream().average().orElse(-1));
metricGroup.gauge(
MAX_SORT_BUFFER_UTILISATION,
() -> getSortBufferUtilisationStream().max().orElse(-1));
metricGroup.gauge(
AVG_SORT_BUFFER_UTILISATION,
() -> getSortBufferUtilisationStream().average().orElse(-1));
}

private LongStream getLevel0FileCountStream() {
Expand Down Expand Up @@ -130,6 +147,14 @@ public LongStream getTotalFileSizeStream() {
return reporters.values().stream().mapToLong(r -> r.totalFileSize);
}

private LongStream getSortBufferUsedBytesStream() {
return reporters.values().stream().mapToLong(r -> r.sortBufferUsedBytes);
}

private DoubleStream getSortBufferUtilisationStream() {
return reporters.values().stream().mapToDouble(r -> r.sortBufferUtilisationPercent);
}

public void close() {
metricGroup.close();
}
Expand Down Expand Up @@ -157,6 +182,8 @@ public interface Reporter {

void reportTotalFileSize(long bytes);

void reportSortBufferMetrics(long usedBytes, long totalBytes);

void unregister();
}

Expand All @@ -167,6 +194,8 @@ private class ReporterImpl implements Reporter {
private long compactionInputSize = 0;
private long compactionOutputSize = 0;
private long totalFileSize = 0;
private long sortBufferUsedBytes = 0;
private double sortBufferUtilisationPercent = 0.0;

private ReporterImpl(PartitionAndBucket key) {
this.key = key;
Expand Down Expand Up @@ -210,6 +239,13 @@ public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
}

@Override
public void reportSortBufferMetrics(long usedBytes, long totalBytes) {
this.sortBufferUsedBytes = usedBytes;
this.sortBufferUtilisationPercent =
totalBytes > 0 ? 100.0 * usedBytes / totalBytes : 0.0;
}

@Override
public void increaseCompactionsCompletedCount() {
compactionsCompletedCounter.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,49 @@ public class CompactionMetricsTest {

@TempDir java.nio.file.Path tempDir;

@Test
public void testSortBufferMetrics() {
CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable");

// no reporters yet: gauges return -1 (no data)
assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_USED_BYTES)).isEqualTo(-1L);
assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_USED_BYTES))
.isEqualTo(-1.0);
assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
.isEqualTo(-1.0);
assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION))
.isEqualTo(-1.0);

CompactionMetrics.Reporter r0 = metrics.createReporter(BinaryRow.EMPTY_ROW, 0);
CompactionMetrics.Reporter r1 = metrics.createReporter(BinaryRow.EMPTY_ROW, 1);

// bucket 0: 32 MB used of 64 MB total = 50% utilisation
r0.reportSortBufferMetrics(32L * 1024 * 1024, 64L * 1024 * 1024);
// bucket 1: 48 MB used of 64 MB total = 75% utilisation
r1.reportSortBufferMetrics(48L * 1024 * 1024, 64L * 1024 * 1024);

assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_USED_BYTES))
.isEqualTo(48L * 1024 * 1024);
assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_USED_BYTES))
.isEqualTo(40.0 * 1024 * 1024);
assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
.isEqualTo(75.0);
assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION))
.isEqualTo(62.5);

// update bucket 0 to full utilisation
r0.reportSortBufferMetrics(64L * 1024 * 1024, 64L * 1024 * 1024);
assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
.isEqualTo(100.0);
assertThat(getMetric(metrics, CompactionMetrics.AVG_SORT_BUFFER_UTILISATION))
.isEqualTo(87.5);

// zero-total pool reports 0% utilisation (no division by zero)
r0.reportSortBufferMetrics(0L, 0L);
assertThat(getMetric(metrics, CompactionMetrics.MAX_SORT_BUFFER_UTILISATION))
.isEqualTo(75.0);
}

@Test
public void testReportMetrics() {
CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable");
Expand Down
Loading