diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index ad44b78042ac3..c46d4a7134390 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -238,7 +238,7 @@ private void collectEvent(final Event event) { enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); if (enrichedEvent.getPipeName() != null - && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + && pendingQueue.isEventFromDroppedPipe(enrichedEvent)) { enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 4b65746b3abed..8641dbc78674e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -356,12 +356,16 @@ public void discardAllEvents() { @Override public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public void discardEventsOfPipe(final CommitterKey committerKey) { + super.discardEventsOfPipe(committerKey); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && isEventFromPipe( - ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { + && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 560512521e756..90d325f6d238e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -201,10 +202,9 @@ public void close() { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe( - final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + public void discardEventsOfPipe(final CommitterKey committerKey) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(committerKey); try { increaseHighPriorityTaskCount(); @@ -217,9 +217,7 @@ public void discardEventsOfPipe( // use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we // will. if (lastEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) - && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() - && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { + && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; // Submit self to avoid that the lastEvent has been retried "max times" times and has @@ -241,9 +239,7 @@ public void discardEventsOfPipe( // clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) - && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() - && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { + && isEventFromPipe((EnrichedEvent) lastExceptionEvent, committerKey)) { clearReferenceCountAndReleaseLastExceptionEvent(); } } @@ -252,11 +248,18 @@ public void discardEventsOfPipe( } if (outputPipeSink instanceof PipeConnectorWithEventDiscard) { - ((PipeConnectorWithEventDiscard) outputPipeSink) - .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + ((PipeConnectorWithEventDiscard) outputPipeSink).discardEventsOfPipe(committerKey); } } + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); + } + //////////////////////////// APIs provided for metric framework //////////////////////////// public String getAttributeSortedString() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 85634277627c4..42b1ae9136676 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -87,19 +88,17 @@ public synchronized void register() { * Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be inconsistent with the * {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel connector scheduling. * - * @param pipeNameToDeregister pipe name - * @param regionId region id + * @param committerKey committer key of the pipe task to deregister * @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, indicating that the * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister( - final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { + public synchronized boolean deregister(final CommitterKey committerKey) { if (registeredTaskCount <= 0) { throw new IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1); } - subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); + subtask.discardEventsOfPipe(committerKey); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 367b92104062d..3ad99ca5c0634 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; @@ -211,7 +212,10 @@ public synchronized void deregister( // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); + final CommitterKey committerKey = + PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId); + + lifeCycles.removeIf(o -> o.deregister(committerKey)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 8bf69e6e6b01a..aede0e994d9a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -157,11 +158,13 @@ public synchronized void close() { */ public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) - && creationTimeToDrop == event.getCreationTime() - && regionId == event.getRegionId()) { + if (isEventFromPipe(event, committerKey)) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } @@ -169,6 +172,14 @@ public synchronized void discardEventsOfPipe( }); } + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); + } + public synchronized void decreaseEventsReferenceCount( final String holderMessage, final boolean shouldReport) { events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 3bec537614c60..b3a8884a14649 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -201,10 +202,12 @@ public boolean isEmpty() { public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); - endPointToBatch - .values() - .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + defaultBatch.discardEventsOfPipe(committerKey); + endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 649ef35c4ce0c..ea83524988b4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.utils.RetryUtils; @@ -613,8 +614,13 @@ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 9adbcf6cf16d1..32a2c191048c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -23,8 +23,8 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; @@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final Map pendingHandlers = new ConcurrentHashMap<>(); - // Pipe name, creation time, region id - private final Set> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -749,16 +747,20 @@ public boolean isEnableSendTsFileLimit() { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isDroppedPipe( - (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { + && isDroppedPipe((EnrichedEvent) event, committerKey)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -769,8 +771,7 @@ && isDroppedPipe( retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isDroppedPipe( - (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { + && isDroppedPipe((EnrichedEvent) event, committerKey)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -872,18 +873,14 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { } private boolean isDroppedPipe(final EnrichedEvent event) { - return droppedPipeTaskKeys.contains( - new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId())); - } - - private static boolean isDroppedPipe( - final EnrichedEvent event, - final String pipeNameToDrop, - final long creationTimeToDrop, - final int regionId) { - return pipeNameToDrop.equals(event.getPipeName()) - && creationTimeToDrop == event.getCreationTime() - && regionId == event.getRegionId(); + return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, key)); + } + + private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 5e6297d843851..d9e25f5e09f29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; @@ -604,8 +605,13 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index bbb4cb9a3a826..baddf4727d688 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.external.collections4.BidiMap; import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap; -import org.apache.iotdb.commons.pipe.datastructure.Triple; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -60,9 +60,7 @@ public class WebSocketConnectorServer extends WebSocketServer { private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); - // Pipe name, creation time, region id - private final Set> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private final BidiMap router = new DualTreeBidiMap(null, Comparator.comparing(Object::hashCode)) {}; @@ -118,33 +116,33 @@ public synchronized void unregister(WebSocketSink connector) { .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); } - droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName)); + droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName)); } public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); final PriorityBlockingQueue eventTransferQueue = - eventsWaitingForTransfer.get(pipeNameToDrop); + eventsWaitingForTransfer.get(committerKey.getPipeName()); if (eventTransferQueue != null) { eventTransferQueue.removeIf( - eventWrapper -> - discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + eventWrapper -> discardIfMatches(eventWrapper.event, committerKey)); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } } final ConcurrentHashMap eventId2EventMap = - eventsWaitingForAck.get(pipeNameToDrop); + eventsWaitingForAck.get(committerKey.getPipeName()); if (eventId2EventMap != null) { eventId2EventMap .entrySet() - .removeIf( - entry -> - discardIfMatches( - entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); + .removeIf(entry -> discardIfMatches(entry.getValue().event, committerKey)); } } @@ -506,19 +504,13 @@ public EventWaitingForAck(WebSocketSink connector, Event event) { } } - private boolean discardIfMatches( - final Event event, - final String pipeNameToDrop, - final long creationTimeToDrop, - final int regionId) { + private boolean discardIfMatches(final Event event, final CommitterKey committerKey) { if (!(event instanceof EnrichedEvent)) { return false; } final EnrichedEvent enrichedEvent = (EnrichedEvent) event; - if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) - || creationTimeToDrop != enrichedEvent.getCreationTime() - || regionId != enrichedEvent.getRegionId()) { + if (!isEventFromPipe(enrichedEvent, committerKey)) { return false; } @@ -528,11 +520,16 @@ private boolean discardIfMatches( private boolean isDroppedPipe(final Event event) { return event instanceof EnrichedEvent - && droppedPipeTaskKeys.contains( - new Triple<>( - ((EnrichedEvent) event).getPipeName(), - ((EnrichedEvent) event).getCreationTime(), - ((EnrichedEvent) event).getRegionId())); + && droppedPipeTaskKeys.stream() + .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key)); + } + + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } private boolean isQueueAvailable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index 3c487ff135680..06eab035b4e15 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; @@ -177,6 +178,13 @@ public void discardEventsOfPipe( } } + @Override + public void discardEventsOfPipe(final CommitterKey committerKey) { + if (server != null) { + server.discardEventsOfPipe(committerKey); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 41026a180481b..ca04c1a6d5fe3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.subscription.task.subtask; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor; import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; @@ -64,8 +65,7 @@ public synchronized void register() { } @Override - public synchronized boolean deregister( - final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { + public synchronized boolean deregister(final CommitterKey committerKey) { if (registeredTaskCount <= 0) { throw new IllegalStateException(DataNodeMiscMessages.REGISTERED_TASK_COUNT_LE_ZERO); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java index 3a1ef0d74ad2f..a41b8a095588f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; @@ -170,7 +171,11 @@ public synchronized void deregister( final PipeSinkSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName, creationTime, regionId)) { + + final CommitterKey committerKey = + PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId); + + if (lifeCycle.deregister(committerKey)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index c56e8143ef590..c430e3f6b066b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -20,8 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.task.connection; import org.apache.iotdb.commons.i18n.PipeMessages; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -48,9 +48,7 @@ public abstract class BlockingPendingQueue { protected final AtomicBoolean isClosed = new AtomicBoolean(false); - // Pipe name, creation time, region id - protected final Set> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + protected final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); protected BlockingPendingQueue( final BlockingQueue pendingQueue, final PipeEventCounter eventCounter) { @@ -139,12 +137,15 @@ public void discardAllEvents() { public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isEventFromPipe( - ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { + && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -192,16 +193,30 @@ protected static boolean isEventFromPipe( && regionId == event.getRegionId(); } + protected static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); + } + protected boolean isEventFromDroppedPipe(final E event) { return event instanceof EnrichedEvent && ((EnrichedEvent) event).getPipeName() != null - && isPipeDropped( - ((EnrichedEvent) event).getPipeName(), - ((EnrichedEvent) event).getCreationTime(), - ((EnrichedEvent) event).getRegionId()); + && isEventFromDroppedPipe((EnrichedEvent) event); + } + + public boolean isEventFromDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event, key)); } public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { - return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, regionId)); + return droppedPipeTaskKeys.stream() + .anyMatch( + key -> + key.getPipeName().equals(pipeName) + && key.getCreationTime() == creationTime + && key.getRegionId() == regionId); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index f2c3a73e18c41..26e7ea305d595 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -168,6 +168,11 @@ private boolean commitSingleId( return true; } + public CommitterKey getCommitterKey( + final String pipeName, final long creationTime, final int regionId) { + return generateCommitterKey(pipeName, creationTime, regionId); + } + private CommitterKey generateCommitterKey( final String pipeName, final long creationTime, final int regionId) { return taskAgent.getCommitterKey( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java index ab4dbcf90750f..4ffc0c25ed244 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -19,7 +19,14 @@ package org.apache.iotdb.commons.pipe.sink.protocol; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; + public interface PipeConnectorWithEventDiscard { void discardEventsOfPipe(String pipeName, long creationTime, int regionId); + + default void discardEventsOfPipe(final CommitterKey committerKey) { + discardEventsOfPipe( + committerKey.getPipeName(), committerKey.getCreationTime(), committerKey.getRegionId()); + } }