Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void collectEvent(final Event event) {
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());

if (enrichedEvent.getPipeName() != null
&& pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) {
&& pendingQueue.isEventFromDroppedPipe(enrichedEvent)) {
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,16 @@ public void discardAllEvents() {
@Override
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public void discardEventsOfPipe(final CommitterKey committerKey) {
super.discardEventsOfPipe(committerKey);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& isEventFromPipe(
((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) {
&& isEventFromPipe((EnrichedEvent) event, committerKey)) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand Down Expand Up @@ -201,10 +202,9 @@ public void close() {
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
* its queued events in the output pipe connector.
*/
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
public void discardEventsOfPipe(final CommitterKey committerKey) {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
inputPendingQueue.discardEventsOfPipe(committerKey);

try {
increaseHighPriorityTaskCount();
Expand All @@ -217,9 +217,7 @@ public void discardEventsOfPipe(
// use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we
// will.
if (lastEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
&& creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
&& isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
// Do not clear the last event's reference counts because it may be on transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max times" times and has
Expand All @@ -241,9 +239,7 @@ public void discardEventsOfPipe(
// clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
&& creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
&& isEventFromPipe((EnrichedEvent) lastExceptionEvent, committerKey)) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
}
Expand All @@ -252,11 +248,18 @@ public void discardEventsOfPipe(
}

if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
((PipeConnectorWithEventDiscard) outputPipeSink)
.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
((PipeConnectorWithEventDiscard) outputPipeSink).discardEventsOfPipe(committerKey);
}
}

private static boolean isEventFromPipe(
final EnrichedEvent event, final CommitterKey committerKey) {
return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

public String getAttributeSortedString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;

import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand Down Expand Up @@ -87,19 +88,17 @@ public synchronized void register() {
* Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be inconsistent with the
* {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel connector scheduling.
*
* @param pipeNameToDeregister pipe name
* @param regionId region id
* @param committerKey committer key of the pipe task to deregister
* @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, indicating that the
* {@link PipeSinkSubtask} should never be used again
* @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
*/
public synchronized boolean deregister(
final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) {
public synchronized boolean deregister(final CommitterKey committerKey) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1);
}

subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId);
subtask.discardEventsOfPipe(committerKey);

try {
if (registeredTaskCount > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
Expand Down Expand Up @@ -211,7 +212,10 @@ public synchronized void deregister(
// Shall not be empty
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;

lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
final CommitterKey committerKey =
PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId);

lifeCycles.removeIf(o -> o.deregister(committerKey));

if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;

import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand Down Expand Up @@ -157,18 +158,28 @@ public synchronized void close() {
*/
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
events.removeIf(
event -> {
if (pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId()) {
if (isEventFromPipe(event, committerKey)) {
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return true;
}
return false;
});
}

private static boolean isEventFromPipe(
final EnrichedEvent event, final CommitterKey committerKey) {
return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
}

public synchronized void decreaseEventsReferenceCount(
final String holderMessage, final boolean shouldReport) {
events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
Expand Down Expand Up @@ -201,10 +202,12 @@ public boolean isEmpty() {

public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
endPointToBatch
.values()
.forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId));
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
defaultBatch.discardEventsOfPipe(committerKey);
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey));
}

public int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.airgap;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.utils.RetryUtils;
Expand Down Expand Up @@ -613,8 +614,13 @@ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
Expand Down Expand Up @@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();

// Pipe name, creation time, region id
private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet();

private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
Expand Down Expand Up @@ -749,16 +747,20 @@ public boolean isEnableSendTsFileLimit() {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId));
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
droppedPipeTaskKeys.add(committerKey);

if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& isDroppedPipe(
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
&& isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
Expand All @@ -769,8 +771,7 @@ && isDroppedPipe(
retryTsFileQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& isDroppedPipe(
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
&& isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
Expand Down Expand Up @@ -872,18 +873,14 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
}

private boolean isDroppedPipe(final EnrichedEvent event) {
return droppedPipeTaskKeys.contains(
new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId()));
}

private static boolean isDroppedPipe(
final EnrichedEvent event,
final String pipeNameToDrop,
final long creationTimeToDrop,
final int regionId) {
return pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId();
return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, key));
}

private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterKey committerKey) {
return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
Expand Down Expand Up @@ -604,8 +605,13 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}

Expand Down
Loading
Loading