Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,18 @@ private void customizeServer(final PipeParameters parameters) {
.setEnableAnonymousAccess(enableAnonymousAccess)
.setSecurityPolicies(securityPolicies)
.setDebounceTimeMs(debounceTimeMs);
LOGGER.info(
"Starting Apache IoTDB OPC UA server: tcpPort={}, httpsPort={}.",
tcpBindPort,
httpsBindPort);
final OpcUaServer newServer = builder.build();
nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
newServer.startup().get();
LOGGER.info(
"Apache IoTDB OPC UA server started: tcpPort={}, httpsPort={}.",
tcpBindPort,
httpsBindPort);
return new Pair<>(new AtomicInteger(0), nameSpace);
} else {
oldValue
Expand Down Expand Up @@ -567,7 +575,9 @@ public void close() throws Exception {

if (pair.getLeft().decrementAndGet() <= 0) {
try {
LOGGER.info("Shutting down Apache IoTDB OPC UA server: serverKey={}.", serverKey);
pair.getRight().shutdown();
LOGGER.info("Apache IoTDB OPC UA server stopped: serverKey={}.", serverKey);
} finally {
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.remove(serverKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;

import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;

import com.google.common.collect.Sets;
Expand All @@ -35,6 +34,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.Key;
import java.security.KeyPair;
import java.security.KeyStore;
Expand Down Expand Up @@ -62,17 +62,27 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
final File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile();

LOGGER.info(DataNodePipeMessages.LOADING_KEYSTORE_AT, serverKeyStore);
boolean needRewrite = false;

if (serverKeyStore.exists()) {
try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
keyStore.load(is, password);
} catch (final IOException e) {
LOGGER.warn(DataNodePipeMessages.LOAD_KEYSTORE_FAILED_THE_EXISTING_KEYSTORE_MAY);
FileUtils.deleteFileOrDirectory(serverKeyStore);
LOGGER.warn(
"Load keyStore {} failed, the existing keyStore may be stale, re-constructing.",
serverKeyStore,
e);
if (!serverKeyStore.delete()) {
LOGGER.warn(
"Delete stale keyStore {} failed. The file will be overwritten if possible.",
serverKeyStore);
needRewrite = true;
}
}
}

if (!serverKeyStore.exists()) {
if (!serverKeyStore.exists() || needRewrite) {
LOGGER.info("Generating new server keyStore at {}", serverKeyStore);
keyStore.load(null, password);

final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
Expand Down Expand Up @@ -108,7 +118,12 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep

keyStore.setKeyEntry(
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
try (final OutputStream os = Files.newOutputStream(serverKeyStore.toPath())) {
try (final OutputStream os =
Files.newOutputStream(
serverKeyStore.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) {
keyStore.store(os, password);
}
}
Expand All @@ -119,6 +134,7 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep

final PublicKey serverPublicKey = serverCertificate.getPublicKey();
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey);
LOGGER.info("Loaded server certificate from keyStore alias {}.", SERVER_ALIAS);
} else {
throw new Exception(
"Invalid keyStore, the serverPrivateKey is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,7 @@ private void transferTabletRowForClientServerModel(
new DateTime(utcTimestamp),
new DateTime());
measurementNode = addNode(name, currentFolder, folderNode, dataValue, type);
if (Objects.isNull(measurementNode.getValue())
|| Objects.isNull(measurementNode.getValue().getSourceTime())
|| measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) {
if (shouldNotifyNodeValueChange(measurementNode, utcTimestamp)) {
notifyNodeValueChange(measurementNode.getNodeId(), dataValue, measurementNode);
}
} else {
Expand All @@ -325,9 +323,7 @@ private void transferTabletRowForClientServerModel(
new DataValue(
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()),
dataType);
if (Objects.isNull(valueNode.getValue())
|| Objects.isNull(valueNode.getValue().getSourceTime())
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
if (shouldNotifyNodeValueChange(valueNode, timestamp)) {
notifyNodeValueChange(
valueNode.getNodeId(),
new DataValue(
Expand All @@ -337,6 +333,26 @@ private void transferTabletRowForClientServerModel(
}
}

private boolean shouldNotifyNodeValueChange(
final UaVariableNode variableNode, final long candidateUtcTime) {
final DataValue currentValue = variableNode.getValue();
if (Objects.isNull(currentValue) || Objects.isNull(currentValue.getSourceTime())) {
return true;
}
final long currentUtcTime = currentValue.getSourceTime().getUtcTime();
if (currentUtcTime < candidateUtcTime) {
return true;
}
if (candidateUtcTime < currentUtcTime) {
LOGGER.debug(
"Reject stale value update: nodeId={}, candidateSourceTime={}, currentSourceTime={}.",
variableNode.getNodeId(),
candidateUtcTime,
currentUtcTime);
}
return false;
}

private UaVariableNode addNode(
final String nodeName,
final String currentFolder,
Expand Down Expand Up @@ -639,14 +655,19 @@ public void onDataItemsCreated(final List<DataItem> dataItems) {
final NodeId nodeId = readValueId.getNodeId();

// 1. Add the new subscription item to the subscription mapping
nodeSubscriptions.compute(
final List<DataItem> subscribedItems =
nodeSubscriptions.compute(
nodeId,
(k, existingList) -> {
List<DataItem> list =
existingList != null ? existingList : new CopyOnWriteArrayList<>();
list.add(item);
return list;
});
LOGGER.debug(
"Registered data item subscription: nodeId={}, subscriptionCount={}.",
nodeId,
(k, existingList) -> {
List<DataItem> list =
existingList != null ? existingList : new CopyOnWriteArrayList<>();
list.add(item);
return list;
});
subscribedItems.size());

// 2. 【Key Optimization】Proactively push the current node's initial value when the new
// subscription item is created
Expand Down Expand Up @@ -680,13 +701,18 @@ public void onDataItemsDeleted(final List<DataItem> dataItems) {
final NodeId nodeId = readValueId.getNodeId();

// When the client cancels the subscription, remove this subscription item from the mapping
nodeSubscriptions.computeIfPresent(
final List<DataItem> remainingItems =
nodeSubscriptions.computeIfPresent(
nodeId,
(k, existingList) -> {
existingList.remove(item);
// Automatically clean up the key when there are no subscribers, save memory
return existingList.isEmpty() ? null : existingList;
});
LOGGER.debug(
"Removed data item subscription: nodeId={}, subscriptionCount={}.",
nodeId,
(k, existingList) -> {
existingList.remove(item);
// Automatically clean up the key when there are no subscribers, save memory
return existingList.isEmpty() ? null : existingList;
});
Objects.isNull(remainingItems) ? 0 : remainingItems.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ public OpcUaServer build() throws Exception {

final File pkiDir = securityDir.resolve("pki").toFile();

LoggerFactory.getLogger(OpcUaServerBuilder.class)
.info("Security dir: {}", securityDir.toAbsolutePath());
LoggerFactory.getLogger(OpcUaServerBuilder.class)
.info("Security pki dir: {}", pkiDir.getAbsolutePath());
LOGGER.info("Security dir: {}", securityDir.toAbsolutePath());
LOGGER.info("Security pki dir: {}", pkiDir.getAbsolutePath());

final OpcUaKeyStoreLoader loader =
new OpcUaKeyStoreLoader().load(securityDir, password.toCharArray());
Expand Down Expand Up @@ -197,8 +195,17 @@ public OpcUaServer build() throws Exception {
StatusCodes.Bad_ConfigurationError,
"Certificate is missing the application URI"));

final Set<SecurityPolicy> configuredSecurityPolicies = new LinkedHashSet<>(securityPolicies);
final Set<EndpointConfiguration> endpointConfigurations =
createEndpointConfigurations(certificate, tcpBindPort, httpsBindPort);
LOGGER.info(
"Built OPC UA server endpoints: tcpPort={}, httpsPort={}, anonymousAccess={}, securityPolicies={}, endpointCount={}, debounceTimeMs={}.",
tcpBindPort,
httpsBindPort,
enableAnonymousAccess,
configuredSecurityPolicies,
endpointConfigurations.size(),
debounceTimeMs);

serverConfig =
OpcUaServerConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ private void fetchStateAndUpdate() {
if (metrics.reachMaxRetryCount()) {
// if reach max retry count, we think that the DN is down, and FI in that node won't
// exist
logger.warn(
"Failed to fetch state for FragmentInstance {} after {} retries, mark it as no such instance",
instance.getId(),
InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT,
e);
FragmentInstanceInfo instanceInfo = new FragmentInstanceInfo(NO_SUCH_INSTANCE);
instanceInfo.setMessage(
String.format(
Expand All @@ -165,7 +170,12 @@ private void fetchStateAndUpdate() {
} else {
// if not reaching max retry count, add retry count, and wait for next fetching schedule
metrics.addRetryCount();
logger.warn(DataNodeQueryMessages.ERROR_HAPPENED_WHILE_FETCHING_QUERY_STATE, e);
logger.debug(
"Failed to fetch state for FragmentInstance {}, retry {}/{}",
instance.getId(),
metrics.retryCount,
InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT,
e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;

Expand All @@ -82,11 +83,14 @@ public class IoTDBInternalLocalReporter extends IoTDBInternalReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBInternalLocalReporter.class);
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
private static final Coordinator COORDINATOR = Coordinator.getInstance();
private static final long METRIC_UPDATE_FAILURE_LOG_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
private final SessionInfo sessionInfo;
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
private Future<?> currentServiceFuture;
private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private final AtomicLong lastMetricUpdateFailureLogTime = new AtomicLong(0);
private final AtomicLong suppressedMetricUpdateFailureLogCount = new AtomicLong(0);

public IoTDBInternalLocalReporter() {
partitionFetcher = ClusterPartitionFetcher.getInstance();
Expand Down Expand Up @@ -174,18 +178,60 @@ protected void writeMetricToIoTDB(Map<String, Object> valueMap, String prefix, l
result = insertRecord(valueMap, prefix, time);
}
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(DataNodeMiscMessages.FAILED_UPDATE_METRIC_VALUE, result);
logFailedUpdateMetricValue(result);
}
} catch (IoTDBConnectionException e1) {
LOGGER.warn(
"Failed to update the value of metric because of connection failure, because ", e1);
logMetricUpdateConnectionFailure(e1);
} catch (IllegalPathException | QueryProcessException e2) {
LOGGER.warn(
"Failed to update the value of metric because of internal error, because ", e2);
logMetricUpdateInternalFailure(e2);
}
});
}

private long getMetricUpdateFailureSuppressedCountIfShouldLog() {
long now = System.currentTimeMillis();
long lastLogTime = lastMetricUpdateFailureLogTime.get();
if (now - lastLogTime >= METRIC_UPDATE_FAILURE_LOG_INTERVAL_MS
&& lastMetricUpdateFailureLogTime.compareAndSet(lastLogTime, now)) {
return suppressedMetricUpdateFailureLogCount.getAndSet(0);
}
suppressedMetricUpdateFailureLogCount.incrementAndGet();
return -1;
}

private void logFailedUpdateMetricValue(TSStatus result) {
long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog();
if (suppressedCount < 0) {
return;
}
LOGGER.warn(
DataNodeMiscMessages.FAILED_UPDATE_METRIC_VALUE + ", suppressedSimilarLogs={}",
result,
suppressedCount);
}

private void logMetricUpdateConnectionFailure(IoTDBConnectionException e) {
long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog();
if (suppressedCount < 0) {
return;
}
LOGGER.warn(
"Failed to update the value of metric because of connection failure, suppressedSimilarLogs={}",
suppressedCount,
e);
}

private void logMetricUpdateInternalFailure(Exception e) {
long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog();
if (suppressedCount < 0) {
return;
}
LOGGER.warn(
"Failed to update the value of metric because of internal error, suppressedSimilarLogs={}",
suppressedCount,
e);
}

private TSStatus insertRecord(Map<String, Object> valueMap, String prefix, long time)
throws IoTDBConnectionException, QueryProcessException, IllegalPathException {
TSInsertRecordReq request = new TSInsertRecordReq();
Expand Down Expand Up @@ -242,7 +288,14 @@ private void createTimeSeries(Map<String, Object> valueMap, String prefix)
COORDINATOR.executeForTreeModel(
s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(DataNodeMiscMessages.FAILED_AUTO_CREATE_TIMESERIES, paths, result.status);
long suppressedCount = getMetricUpdateFailureSuppressedCountIfShouldLog();
if (suppressedCount >= 0) {
LOGGER.warn(
DataNodeMiscMessages.FAILED_AUTO_CREATE_TIMESERIES + ", suppressedSimilarLogs={}",
paths,
result.status,
suppressedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3777,7 +3777,7 @@ public int executeTTLCheck() throws InterruptedException {
if (skipCurrentTTLAndModificationCheck()) {
return 0;
}
logger.info(
logger.debug(
"[TTL] {}-{} Start ttl and modification checking.", databaseName, dataRegionIdString);
CompactionScheduleContext context =
new CompactionScheduleContext(
Expand All @@ -3800,12 +3800,19 @@ public int executeTTLCheck() throws InterruptedException {
if (context.hasSubmitTask()) {
CompactionMetrics.getInstance().updateCompactionTaskSelectionNum(context);
}
logger.info(
"[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.",
databaseName,
dataRegionIdString,
context.getFullyDirtyFileNum(),
context.getPartiallyDirtyFileNum());
if (context.hasSubmitTask()) {
logger.info(
"[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.",
databaseName,
dataRegionIdString,
context.getFullyDirtyFileNum(),
context.getPartiallyDirtyFileNum());
} else {
logger.debug(
"[TTL] {}-{} No all-outdated or partial-outdated files are selected.",
databaseName,
dataRegionIdString);
}
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
Expand Down
Loading
Loading