From aab9152118e21d7d104fc28a98e55afc36cd0a58 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 18:23:47 +0800 Subject: [PATCH 1/3] Optimize noisy kernel logs --- .../FixedRateFragInsStateTracker.java | 12 +++++++- .../schedule/CompactionScheduler.java | 28 +++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java index 0c7ffe3ec642a..0fe32415b8ea9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java @@ -156,6 +156,11 @@ private void fetchStateAndUpdate() { if (metrics.reachMaxRetryCount()) { // if reach max retry count, we think that the DN is down, and FI in that node won't // exist + logger.warn( + "Failed to fetch state for FragmentInstance {} after {} retries, mark it as no such instance", + instance.getId(), + InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT, + e); FragmentInstanceInfo instanceInfo = new FragmentInstanceInfo(NO_SUCH_INSTANCE); instanceInfo.setMessage( String.format( @@ -165,7 +170,12 @@ private void fetchStateAndUpdate() { } else { // if not reaching max retry count, add retry count, and wait for next fetching schedule metrics.addRetryCount(); - logger.warn(DataNodeQueryMessages.ERROR_HAPPENED_WHILE_FETCHING_QUERY_STATE, e); + logger.debug( + "Failed to fetch state for FragmentInstance {}, retry {}/{}", + instance.getId(), + metrics.retryCount, + InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT, + e); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index ae9e871b0b40c..208cc01c783db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -47,6 +47,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -65,6 +67,9 @@ public class CompactionScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final long DISK_SPACE_CHECK_FAIL_LOG_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + private static final AtomicLong LAST_DISK_SPACE_CHECK_FAIL_LOG_TIME = new AtomicLong(0); + private static final AtomicLong SUPPRESSED_DISK_SPACE_CHECK_FAIL_LOG_COUNT = new AtomicLong(0); private CompactionScheduler() {} @@ -207,13 +212,32 @@ private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) } // check disk space if (!task.isDiskSpaceCheckPassed()) { - LOGGER.info( - "Compaction task start check failed because disk free ratio is less than disk_space_warning_threshold"); + logDiskSpaceCheckFailure(task); return false; } return true; } + private static void logDiskSpaceCheckFailure(AbstractCompactionTask task) { + long now = System.currentTimeMillis(); + long lastLogTime = LAST_DISK_SPACE_CHECK_FAIL_LOG_TIME.get(); + if (now - lastLogTime >= DISK_SPACE_CHECK_FAIL_LOG_INTERVAL_MS + && LAST_DISK_SPACE_CHECK_FAIL_LOG_TIME.compareAndSet(lastLogTime, now)) { + long suppressedCount = SUPPRESSED_DISK_SPACE_CHECK_FAIL_LOG_COUNT.getAndSet(0); + LOGGER.info( + "Skip compaction task because disk free ratio is less than disk_space_warning_threshold, " + + "taskType={}, storageGroup={}, dataRegion={}, timePartition={}, processedFileNum={}, suppressedSimilarLogs={}", + task.getCompactionTaskType(), + task.getStorageGroupName(), + task.getDataRegionId(), + task.getTimePartition(), + task.getProcessedFileNum(), + suppressedCount); + } else { + SUPPRESSED_DISK_SPACE_CHECK_FAIL_LOG_COUNT.incrementAndGet(); + } + } + public static int scheduleInsertionCompaction( TsFileManager tsFileManager, long timePartition, CompactionScheduleContext context) throws InterruptedException { From 186c0a98f003b15d899029d82e96a317a43f1f81 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 18:54:00 +0800 Subject: [PATCH 2/3] Improve OPC UA server logging --- .../pipe/sink/protocol/opcua/OpcUaSink.java | 10 +++ .../opcua/server/OpcUaKeyStoreLoader.java | 26 ++++++-- .../protocol/opcua/server/OpcUaNameSpace.java | 64 +++++++++++++------ .../opcua/server/OpcUaServerBuilder.java | 15 +++-- 4 files changed, 87 insertions(+), 28 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index a4daf16610ce6..0d2deddcdcd4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -335,10 +335,18 @@ private void customizeServer(final PipeParameters parameters) { .setEnableAnonymousAccess(enableAnonymousAccess) .setSecurityPolicies(securityPolicies) .setDebounceTimeMs(debounceTimeMs); + LOGGER.info( + "Starting Apache IoTDB OPC UA server: tcpPort={}, httpsPort={}.", + tcpBindPort, + httpsBindPort); final OpcUaServer newServer = builder.build(); nameSpace = new OpcUaNameSpace(newServer, builder); nameSpace.startup(); newServer.startup().get(); + LOGGER.info( + "Apache IoTDB OPC UA server started: tcpPort={}, httpsPort={}.", + tcpBindPort, + httpsBindPort); return new Pair<>(new AtomicInteger(0), nameSpace); } else { oldValue @@ -567,7 +575,9 @@ public void close() throws Exception { if (pair.getLeft().decrementAndGet() <= 0) { try { + LOGGER.info("Shutting down Apache IoTDB OPC UA server: serverKey={}.", serverKey); pair.getRight().shutdown(); + LOGGER.info("Apache IoTDB OPC UA server stopped: serverKey={}.", serverKey); } finally { SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.remove(serverKey); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java index 5fd6e5501cabf..c1b46d351f933 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; -import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import com.google.common.collect.Sets; @@ -35,6 +34,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.security.Key; import java.security.KeyPair; import java.security.KeyStore; @@ -62,17 +62,27 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep final File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile(); LOGGER.info(DataNodePipeMessages.LOADING_KEYSTORE_AT, serverKeyStore); + boolean needRewrite = false; if (serverKeyStore.exists()) { try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) { keyStore.load(is, password); } catch (final IOException e) { - LOGGER.warn(DataNodePipeMessages.LOAD_KEYSTORE_FAILED_THE_EXISTING_KEYSTORE_MAY); - FileUtils.deleteFileOrDirectory(serverKeyStore); + LOGGER.warn( + "Load keyStore {} failed, the existing keyStore may be stale, re-constructing.", + serverKeyStore, + e); + if (!serverKeyStore.delete()) { + LOGGER.warn( + "Delete stale keyStore {} failed. The file will be overwritten if possible.", + serverKeyStore); + needRewrite = true; + } } } - if (!serverKeyStore.exists()) { + if (!serverKeyStore.exists() || needRewrite) { + LOGGER.info("Generating new server keyStore at {}", serverKeyStore); keyStore.load(null, password); final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048); @@ -108,7 +118,12 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep keyStore.setKeyEntry( SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate}); - try (final OutputStream os = Files.newOutputStream(serverKeyStore.toPath())) { + try (final OutputStream os = + Files.newOutputStream( + serverKeyStore.toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING)) { keyStore.store(os, password); } } @@ -119,6 +134,7 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep final PublicKey serverPublicKey = serverCertificate.getPublicKey(); serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey); + LOGGER.info("Loaded server certificate from keyStore alias {}.", SERVER_ALIAS); } else { throw new Exception( "Invalid keyStore, the serverPrivateKey is " diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 917720220bf2d..e5c4a14917feb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -305,9 +305,7 @@ private void transferTabletRowForClientServerModel( new DateTime(utcTimestamp), new DateTime()); measurementNode = addNode(name, currentFolder, folderNode, dataValue, type); - if (Objects.isNull(measurementNode.getValue()) - || Objects.isNull(measurementNode.getValue().getSourceTime()) - || measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) { + if (shouldNotifyNodeValueChange(measurementNode, utcTimestamp)) { notifyNodeValueChange(measurementNode.getNodeId(), dataValue, measurementNode); } } else { @@ -325,9 +323,7 @@ private void transferTabletRowForClientServerModel( new DataValue( new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()), dataType); - if (Objects.isNull(valueNode.getValue()) - || Objects.isNull(valueNode.getValue().getSourceTime()) - || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) { + if (shouldNotifyNodeValueChange(valueNode, timestamp)) { notifyNodeValueChange( valueNode.getNodeId(), new DataValue( @@ -337,6 +333,26 @@ private void transferTabletRowForClientServerModel( } } + private boolean shouldNotifyNodeValueChange( + final UaVariableNode variableNode, final long candidateUtcTime) { + final DataValue currentValue = variableNode.getValue(); + if (Objects.isNull(currentValue) || Objects.isNull(currentValue.getSourceTime())) { + return true; + } + final long currentUtcTime = currentValue.getSourceTime().getUtcTime(); + if (currentUtcTime < candidateUtcTime) { + return true; + } + if (candidateUtcTime < currentUtcTime) { + LOGGER.debug( + "Reject stale value update: nodeId={}, candidateSourceTime={}, currentSourceTime={}.", + variableNode.getNodeId(), + candidateUtcTime, + currentUtcTime); + } + return false; + } + private UaVariableNode addNode( final String nodeName, final String currentFolder, @@ -639,14 +655,19 @@ public void onDataItemsCreated(final List dataItems) { final NodeId nodeId = readValueId.getNodeId(); // 1. Add the new subscription item to the subscription mapping - nodeSubscriptions.compute( + final List subscribedItems = + nodeSubscriptions.compute( + nodeId, + (k, existingList) -> { + List list = + existingList != null ? existingList : new CopyOnWriteArrayList<>(); + list.add(item); + return list; + }); + LOGGER.debug( + "Registered data item subscription: nodeId={}, subscriptionCount={}.", nodeId, - (k, existingList) -> { - List list = - existingList != null ? existingList : new CopyOnWriteArrayList<>(); - list.add(item); - return list; - }); + subscribedItems.size()); // 2. 【Key Optimization】Proactively push the current node's initial value when the new // subscription item is created @@ -680,13 +701,18 @@ public void onDataItemsDeleted(final List dataItems) { final NodeId nodeId = readValueId.getNodeId(); // When the client cancels the subscription, remove this subscription item from the mapping - nodeSubscriptions.computeIfPresent( + final List remainingItems = + nodeSubscriptions.computeIfPresent( + nodeId, + (k, existingList) -> { + existingList.remove(item); + // Automatically clean up the key when there are no subscribers, save memory + return existingList.isEmpty() ? null : existingList; + }); + LOGGER.debug( + "Removed data item subscription: nodeId={}, subscriptionCount={}.", nodeId, - (k, existingList) -> { - existingList.remove(item); - // Automatically clean up the key when there are no subscribers, save memory - return existingList.isEmpty() ? null : existingList; - }); + Objects.isNull(remainingItems) ? 0 : remainingItems.size()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java index 297805162a914..91c44e83b823f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java @@ -142,10 +142,8 @@ public OpcUaServer build() throws Exception { final File pkiDir = securityDir.resolve("pki").toFile(); - LoggerFactory.getLogger(OpcUaServerBuilder.class) - .info("Security dir: {}", securityDir.toAbsolutePath()); - LoggerFactory.getLogger(OpcUaServerBuilder.class) - .info("Security pki dir: {}", pkiDir.getAbsolutePath()); + LOGGER.info("Security dir: {}", securityDir.toAbsolutePath()); + LOGGER.info("Security pki dir: {}", pkiDir.getAbsolutePath()); final OpcUaKeyStoreLoader loader = new OpcUaKeyStoreLoader().load(securityDir, password.toCharArray()); @@ -197,8 +195,17 @@ public OpcUaServer build() throws Exception { StatusCodes.Bad_ConfigurationError, "Certificate is missing the application URI")); + final Set configuredSecurityPolicies = new LinkedHashSet<>(securityPolicies); final Set endpointConfigurations = createEndpointConfigurations(certificate, tcpBindPort, httpsBindPort); + LOGGER.info( + "Built OPC UA server endpoints: tcpPort={}, httpsPort={}, anonymousAccess={}, securityPolicies={}, endpointCount={}, debounceTimeMs={}.", + tcpBindPort, + httpsBindPort, + enableAnonymousAccess, + configuredSecurityPolicies, + endpointConfigurations.size(), + debounceTimeMs); serverConfig = OpcUaServerConfig.builder() From 1a415818839f597a19f7ce45191ecf9e6e92a051 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 18:23:47 +0800 Subject: [PATCH 3/3] Optimize noisy kernel logs --- .../metrics/IoTDBInternalLocalReporter.java | 65 +++++++++++++++++-- .../storageengine/dataregion/DataRegion.java | 21 ++++-- .../CompactionScheduleTaskWorker.java | 11 ++-- .../compaction/schedule/TTLScheduleTask.java | 7 +- .../dataregion/wal/WALManager.java | 27 ++++++-- .../service/TriggerInformationUpdater.java | 21 +++++- 6 files changed, 126 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index 444fdd2e9899b..c24586f90e123 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -74,6 +74,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; @@ -82,11 +83,14 @@ public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBInternalLocalReporter.class); private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); private static final Coordinator COORDINATOR = Coordinator.getInstance(); + private static final long METRIC_UPDATE_FAILURE_LOG_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final SessionInfo sessionInfo; private final IPartitionFetcher partitionFetcher; private final ISchemaFetcher schemaFetcher; private Future currentServiceFuture; private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + private final AtomicLong lastMetricUpdateFailureLogTime = new AtomicLong(0); + private final AtomicLong suppressedMetricUpdateFailureLogCount = new AtomicLong(0); public IoTDBInternalLocalReporter() { partitionFetcher = ClusterPartitionFetcher.getInstance(); @@ -174,18 +178,60 @@ protected void writeMetricToIoTDB(Map valueMap, String prefix, l result = insertRecord(valueMap, prefix, time); } if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn(DataNodeMiscMessages.FAILED_UPDATE_METRIC_VALUE, result); + logFailedUpdateMetricValue(result); } } catch (IoTDBConnectionException e1) { - LOGGER.warn( - "Failed to update the value of metric because of connection failure, because ", e1); + logMetricUpdateConnectionFailure(e1); } catch (IllegalPathException | QueryProcessException e2) { - LOGGER.warn( - "Failed to update the value of metric because of internal error, because ", e2); + logMetricUpdateInternalFailure(e2); } }); } + private long getMetricUpdateFailureSuppressedCountIfShouldLog() { + long now = System.currentTimeMillis(); + long lastLogTime = lastMetricUpdateFailureLogTime.get(); + if (now - lastLogTime >= METRIC_UPDATE_FAILURE_LOG_INTERVAL_MS + && lastMetricUpdateFailureLogTime.compareAndSet(lastLogTime, now)) { + return suppressedMetricUpdateFailureLogCount.getAndSet(0); + } + suppressedMetricUpdateFailureLogCount.incrementAndGet(); + return -1; + } + + private void logFailedUpdateMetricValue(TSStatus result) { + long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog(); + if (suppressedCount < 0) { + return; + } + LOGGER.warn( + DataNodeMiscMessages.FAILED_UPDATE_METRIC_VALUE + ", suppressedSimilarLogs={}", + result, + suppressedCount); + } + + private void logMetricUpdateConnectionFailure(IoTDBConnectionException e) { + long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog(); + if (suppressedCount < 0) { + return; + } + LOGGER.warn( + "Failed to update the value of metric because of connection failure, suppressedSimilarLogs={}", + suppressedCount, + e); + } + + private void logMetricUpdateInternalFailure(Exception e) { + long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog(); + if (suppressedCount < 0) { + return; + } + LOGGER.warn( + "Failed to update the value of metric because of internal error, suppressedSimilarLogs={}", + suppressedCount, + e); + } + private TSStatus insertRecord(Map valueMap, String prefix, long time) throws IoTDBConnectionException, QueryProcessException, IllegalPathException { TSInsertRecordReq request = new TSInsertRecordReq(); @@ -242,7 +288,14 @@ private void createTimeSeries(Map valueMap, String prefix) COORDINATOR.executeForTreeModel( s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn(DataNodeMiscMessages.FAILED_AUTO_CREATE_TIMESERIES, paths, result.status); + long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog(); + if (suppressedCount >= 0) { + LOGGER.warn( + DataNodeMiscMessages.FAILED_AUTO_CREATE_TIMESERIES + ", suppressedSimilarLogs={}", + paths, + result.status, + suppressedCount); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 784d4fa477119..012a4cfd6db35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3777,7 +3777,7 @@ public int executeTTLCheck() throws InterruptedException { if (skipCurrentTTLAndModificationCheck()) { return 0; } - logger.info( + logger.debug( "[TTL] {}-{} Start ttl and modification checking.", databaseName, dataRegionIdString); CompactionScheduleContext context = new CompactionScheduleContext( @@ -3800,12 +3800,19 @@ public int executeTTLCheck() throws InterruptedException { if (context.hasSubmitTask()) { CompactionMetrics.getInstance().updateCompactionTaskSelectionNum(context); } - logger.info( - "[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.", - databaseName, - dataRegionIdString, - context.getFullyDirtyFileNum(), - context.getPartiallyDirtyFileNum()); + if (context.hasSubmitTask()) { + logger.info( + "[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.", + databaseName, + dataRegionIdString, + context.getFullyDirtyFileNum(), + context.getPartiallyDirtyFileNum()); + } else { + logger.debug( + "[TTL] {}-{} No all-outdated or partial-outdated files are selected.", + databaseName, + dataRegionIdString); + } } catch (InterruptedException e) { throw e; } catch (Throwable e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java index 714d232d62538..05069a06c884a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java @@ -74,13 +74,16 @@ public Void call() { } catch (InterruptedException ignored) { boolean isStoppedByUser = CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask(); - logger.info( - "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}", - workerId, - isStoppedByUser); if (isStoppedByUser) { + logger.debug( + "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted by stop signal", + workerId); return null; } + logger.info( + "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}", + workerId, + false); } catch (Exception e) { logger.error( "[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java index f76f58aab8022..71e4e53c75e96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java @@ -70,13 +70,12 @@ public Void call() throws Exception { } catch (StopTTLCheckException | InterruptedException ignored) { boolean isStoppedByUser = CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask(); - logger.info( - "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}", - workerId, - isStoppedByUser); if (isStoppedByUser) { + logger.debug("[TTLCheckTask-{}] TTL checker is interrupted by stop signal", workerId); return null; } + logger.info( + "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}", workerId, false); } catch (Exception e) { logger.error(StorageEngineMessages.TTL_CHECK_TASK_FAILED, workerId, e); } catch (Throwable t) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 00eb47d40ae2b..02d3a39940733 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -60,6 +60,8 @@ public class WALManager implements IService { private static final Logger logger = LoggerFactory.getLogger(WALManager.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final long WAL_DISK_USAGE_OVER_THROTTLE_LOG_INTERVAL_MS = + TimeUnit.MINUTES.toMillis(1); // manage all wal nodes and decide how to allocate them private final NodeAllocationStrategy walNodesManager; @@ -69,6 +71,8 @@ public class WALManager implements IService { private final AtomicLong totalDiskUsage = new AtomicLong(); // total number of wal files private final AtomicLong totalFileNum = new AtomicLong(); + private final AtomicLong lastWalDiskUsageOverThrottleLogTime = new AtomicLong(0); + private final AtomicLong suppressedWalDiskUsageOverThrottleLogCount = new AtomicLong(0); private WALManager() { if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) @@ -184,10 +188,7 @@ private void deleteOutdatedFiles() { while ((firstLoop || shouldThrottle())) { deleteOutdatedFilesInWALNodes(); if (firstLoop && shouldThrottle()) { - logger.warn( - "WAL disk usage {} is larger than the wal_throttle_threshold_in_byte * 0.8 {}, please check your write load, iot consensus and the pipe module. It's better to allocate more disk for WAL.", - getTotalDiskUsage(), - getThrottleThreshold()); + logWalDiskUsageOverThrottle(); } firstLoop = false; if (Thread.interrupted()) { @@ -197,6 +198,24 @@ private void deleteOutdatedFiles() { } } + private void logWalDiskUsageOverThrottle() { + long now = System.currentTimeMillis(); + long lastLogTime = lastWalDiskUsageOverThrottleLogTime.get(); + if (now - lastLogTime >= WAL_DISK_USAGE_OVER_THROTTLE_LOG_INTERVAL_MS + && lastWalDiskUsageOverThrottleLogTime.compareAndSet(lastLogTime, now)) { + long suppressedCount = suppressedWalDiskUsageOverThrottleLogCount.getAndSet(0); + logger.warn( + "WAL disk usage {} is larger than the wal_throttle_threshold_in_byte * 0.8 {}, " + + "please check your write load, iot consensus and the pipe module. " + + "It's better to allocate more disk for WAL. suppressedSimilarLogs={}", + getTotalDiskUsage(), + getThrottleThreshold(), + suppressedCount); + } else { + suppressedWalDiskUsageOverThrottleLogCount.incrementAndGet(); + } + } + protected void deleteOutdatedFilesInWALNodes() { if (config.getWalMode() == WALMode.DISABLE) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java index 3b37eee7d1ff4..b14b2f05a6c83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java @@ -40,11 +40,13 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class TriggerInformationUpdater { private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInformationUpdater.class); + private static final long UPDATE_FAILURE_LOG_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance(); @@ -54,6 +56,8 @@ public class TriggerInformationUpdater { ThreadName.STATEFUL_TRIGGER_INFORMATION_UPDATER.getName()); private Future updateFuture; + private final AtomicLong lastUpdateFailureLogTime = new AtomicLong(0); + private final AtomicLong suppressedUpdateFailureLogCount = new AtomicLong(0); private static final long UPDATE_INTERVAL = 1000L * 60; @@ -98,7 +102,22 @@ public void updateTask() { triggerInformation.getTriggerName(), triggerInformation.getDataNodeLocation()); } } catch (Exception e) { - LOGGER.warn(DataNodeMiscMessages.ERROR_UPDATING_TRIGGER_INFO, e); + logUpdateFailure(e); + } + } + + private void logUpdateFailure(Exception e) { + long now = System.currentTimeMillis(); + long lastLogTime = lastUpdateFailureLogTime.get(); + if (now - lastLogTime >= UPDATE_FAILURE_LOG_INTERVAL_MS + && lastUpdateFailureLogTime.compareAndSet(lastLogTime, now)) { + long suppressedCount = suppressedUpdateFailureLogCount.getAndSet(0); + LOGGER.warn( + DataNodeMiscMessages.ERROR_UPDATING_TRIGGER_INFO + " suppressedSimilarLogs={}", + suppressedCount, + e); + } else { + suppressedUpdateFailureLogCount.incrementAndGet(); } } }