Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ public static class ValueHider {

static {
KEYS.add("ssl.trust-store-pwd");
KEYS.add("scp.password");
KEYS.add("password");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public synchronized String register(
final int sinkNum;
boolean realTimeFirst = false;
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
sinkNum =
pipeSinkParameters.getIntOrDefault(
Expand Down Expand Up @@ -119,7 +120,9 @@ public synchronized String register(
sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
environment.setAttributeSortedString(attributeSortedString);
final String attributeDisplayStringWithPrefix =
isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString;
environment.setAttributeSortedString(attributeDisplayStringWithPrefix);

if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
Expand Down Expand Up @@ -168,9 +171,10 @@ public synchronized String register(
final PipeSinkSubtask pipeSinkSubtask =
new PipeSinkSubtask(
String.format(
"%s_%s_%s", attributeSortedString, environment.getCreationTime(), sinkIndex),
"%s_%s_%s",
attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex),
environment.getCreationTime(),
attributeSortedString,
attributeDisplayStringWithPrefix,
sinkIndex,
pendingQueue,
pipeSink);
Expand All @@ -181,7 +185,7 @@ public synchronized String register(

LOGGER.info(
DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED,
attributeSortedString,
attributeDisplayStringWithPrefix,
executor.getWorkingThreadName(),
executor.getCallbackThreadName());
attributeSortedString2SubtaskLifeCycleMap.put(
Expand Down Expand Up @@ -264,13 +268,23 @@ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
.getPendingQueue();
}

private String generateAttributeSortedString(final PipeParameters pipeConnectorParameters) {
private static String generateAttributeSortedString(
final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return sortedStringSourceMap.toString();
}

/** Masked attribute string for logs, metrics and exception messages. */
private static String generateAttributeDisplayString(
final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> filteredAttributes =
new TreeMap<>(pipeConnectorParameters.getAttribute());
filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return new PipeParameters(filteredAttributes).toString();
}

///////////////////////// Singleton Instance Holder /////////////////////////

private PipeSinkSubtaskManager() {
Expand Down
Loading