diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index c13f87ae5794f..f9ef2a64a8346 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -429,6 +429,7 @@ public static class ValueHider { static { KEYS.add("ssl.trust-store-pwd"); + KEYS.add("scp.password"); KEYS.add("password"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 367b92104062d..83f2a6eb82020 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -91,6 +91,7 @@ public synchronized String register( final int sinkNum; boolean realTimeFirst = false; String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters); if (isDataRegionSink) { sinkNum = pipeSinkParameters.getIntOrDefault( @@ -119,7 +120,9 @@ public synchronized String register( sinkNum = 1; attributeSortedString = "schema_" + attributeSortedString; } - environment.setAttributeSortedString(attributeSortedString); + final String attributeDisplayStringWithPrefix = + isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString; + environment.setAttributeSortedString(attributeDisplayStringWithPrefix); if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { final PipeSinkSubtaskExecutor executor = executorSupplier.get(); @@ -168,9 +171,10 @@ public synchronized String register( final PipeSinkSubtask pipeSinkSubtask = new PipeSinkSubtask( String.format( - "%s_%s_%s", attributeSortedString, environment.getCreationTime(), sinkIndex), + "%s_%s_%s", + attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex), environment.getCreationTime(), - attributeSortedString, + attributeDisplayStringWithPrefix, sinkIndex, pendingQueue, pipeSink); @@ -181,7 +185,7 @@ public synchronized String register( LOGGER.info( DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED, - attributeSortedString, + attributeDisplayStringWithPrefix, executor.getWorkingThreadName(), executor.getCallbackThreadName()); attributeSortedString2SubtaskLifeCycleMap.put( @@ -264,13 +268,23 @@ public UnboundedBlockingPendingQueue getPipeSinkPendingQueue( .getPendingQueue(); } - private String generateAttributeSortedString(final PipeParameters pipeConnectorParameters) { + private static String generateAttributeSortedString( + final PipeParameters pipeConnectorParameters) { final TreeMap sortedStringSourceMap = new TreeMap<>(pipeConnectorParameters.getAttribute()); sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); return sortedStringSourceMap.toString(); } + /** Masked attribute string for logs, metrics and exception messages. */ + private static String generateAttributeDisplayString( + final PipeParameters pipeConnectorParameters) { + final TreeMap filteredAttributes = + new TreeMap<>(pipeConnectorParameters.getAttribute()); + filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); + return new PipeParameters(filteredAttributes).toString(); + } + ///////////////////////// Singleton Instance Holder ///////////////////////// private PipeSinkSubtaskManager() {