From 09c0a97ae920935d7c2e1456c4e2113673b22ac1 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 21 May 2026 11:17:04 +0800 Subject: [PATCH 1/3] fix --- .../confignode/manager/cq/CQManager.java | 42 ++++++- .../procedure/impl/cq/CreateCQProcedure.java | 62 +++++++++- .../confignode/persistence/CQInfoTest.java | 37 ++++++ .../procedure/impl/CreateCQProcedureTest.java | 26 ++++ .../cq/CreateCQProcedureRecoveryTest.java | 112 ++++++++++++++++++ 5 files changed, 273 insertions(+), 6 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index c4c1e8aede97f..faad86239909f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -43,7 +43,10 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,11 +60,14 @@ public class CQManager { private final ReadWriteLock lock; + private final ConcurrentMap locallyScheduledCQs; + private ScheduledExecutorService executor; public CQManager(ConfigManager configManager) { this.configManager = configManager; this.lock = new ReentrantReadWriteLock(); + this.locallyScheduledCQs = new ConcurrentHashMap<>(); this.executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); @@ -79,7 +85,11 @@ public TSStatus createCQ(TCreateCQReq req) { public TSStatus dropCQ(TDropCQReq req) { try { - return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + locallyScheduledCQs.remove(req.cqId); + } + return status; } catch (ConsensusException e) { LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, req.cqId, e); // consensus layer related errors @@ -132,6 +142,7 @@ public void startCQScheduler() { executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); + locallyScheduledCQs.clear(); // 3. get all CQs List allCQs = null; @@ -155,8 +166,16 @@ public void startCQScheduler() { if (allCQs != null) { for (CQInfo.CQEntry entry : allCQs) { if (entry.getState() == CQState.ACTIVE) { + if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) { + continue; + } CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); - cqScheduleTask.submitSelf(); + try { + cqScheduleTask.submitSelf(); + } catch (RuntimeException e) { + unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5()); + throw e; + } } } } @@ -176,6 +195,7 @@ public void stopCQScheduler() { try { previous = executor; executor = null; + locallyScheduledCQs.clear(); } finally { lock.writeLock().unlock(); } @@ -183,4 +203,22 @@ public void stopCQScheduler() { previous.shutdown(); } } + + public boolean markCQLocallyScheduled(String cqId, String md5) { + AtomicBoolean shouldSchedule = new AtomicBoolean(false); + locallyScheduledCQs.compute( + cqId, + (ignored, previousMd5) -> { + if (!md5.equals(previousMd5)) { + shouldSchedule.set(true); + return md5; + } + return previousMd5; + }); + return shouldSchedule.get(); + } + + public void unmarkCQLocallyScheduled(String cqId, String md5) { + locallyScheduledCQs.remove(cqId, md5); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index ac964d23ca311..18b9d99ee18c1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -22,11 +22,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; +import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.i18n.ProcedureMessages; +import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; +import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; @@ -45,6 +49,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE; @@ -75,7 +81,7 @@ public CreateCQProcedure(ScheduledExecutorService executor) { public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) { super(); this.req = req; - this.md5 = DigestUtils.md2Hex(req.cqId); + this.md5 = generateCQToken(req.cqId); this.executor = executor; this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval); @@ -91,12 +97,15 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateCQState state) addCQ(env); return Flow.HAS_MORE_STATE; case INACTIVE: - CQScheduleTask cqScheduleTask = - new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()); - cqScheduleTask.submitSelf(); + submitScheduleTask( + env, + new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager())); setNextState(SCHEDULED); break; case SCHEDULED: + if (isStateDeserialized()) { + recoverScheduledTask(env); + } activeCQ(env); return Flow.NO_MORE_STATE; default: @@ -168,6 +177,43 @@ private void activeCQ(ConfigNodeProcedureEnv env) { } } + void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException { + Optional cqEntry = getCurrentCQEntry(env); + if (!cqEntry.isPresent()) { + LOGGER.info( + "Skip recovering the schedule task of CQ {} because its metadata is unavailable.", + req.cqId); + return; + } + submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager())); + } + + Optional getCurrentCQEntry(ConfigNodeProcedureEnv env) + throws ConsensusException { + ShowCQResp response = + (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan()); + return response.getCqList().stream() + .filter(entry -> req.cqId.equals(entry.getCqId()) && md5.equals(entry.getMd5())) + .findFirst(); + } + + private static String generateCQToken(String cqId) { + return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID()); + } + + private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) { + CQManager cqManager = env.getConfigManager().getCQManager(); + if (!cqManager.markCQLocallyScheduled(req.cqId, md5)) { + return; + } + try { + cqScheduleTask.submitSelf(); + } catch (RuntimeException e) { + cqManager.unmarkCQLocallyScheduled(req.cqId, md5); + throw e; + } + } + @Override protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) throws IOException, InterruptedException, ProcedureException { @@ -272,4 +318,12 @@ public int hashCode() { md5, firstExecutionTime); } + + public String getCqId() { + return req == null ? null : req.getCqId(); + } + + public String getMd5() { + return md5; + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index 4b409d6cf0cd2..0978d0bba9bc0 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -19,8 +19,11 @@ package org.apache.iotdb.confignode.persistence; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.apache.tsfile.external.commons.io.FileUtils; @@ -99,4 +102,38 @@ public void testSnapshot() throws TException, IOException { Assert.assertEquals(cqInfo, actualCQInfo); } + + @Test + public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { + long executionTime = System.currentTimeMillis(); + TCreateCQReq req = + new TCreateCQReq( + "testCq3", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d3.s1 from root.sg.d3", + "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from root.sg.d3 END", + "Asia", + "root"); + + cqInfo.addCQ(new AddCQPlan(req, "oldMd5", executionTime)); + cqInfo.dropCQ(new DropCQPlan("testCq3")); + cqInfo.addCQ(new AddCQPlan(req, "newMd5", executionTime)); + + Assert.assertEquals( + TSStatusCode.NO_SUCH_CQ.getStatusCode(), + cqInfo + .updateCQLastExecutionTime( + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldMd5")) + .code); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + cqInfo + .updateCQLastExecutionTime( + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newMd5")) + .code); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java index d0e92b3281666..4e088f578ad4a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java @@ -36,10 +36,36 @@ import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; public class CreateCQProcedureTest { + @Test + public void tokenShouldBeUniqueForSameCQId() { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + try { + TCreateCQReq req = + new TCreateCQReq( + "testCq1", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d1(s1) from root.sg.d1", + "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END", + "Asia", + "root"); + CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor); + CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor); + + assertNotEquals(createCQProcedure1.getMd5(), createCQProcedure2.getMd5()); + } finally { + executor.shutdown(); + } + } + @Test public void serializeDeserializeTest() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java new file mode 100644 index 0000000000000..ca68f6f3a892e --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.cq; + +import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.cq.CQManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.persistence.cq.CQInfo; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class CreateCQProcedureRecoveryTest { + + private TCreateCQReq newCreateCQReq() { + return new TCreateCQReq( + "testCq1", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d1.s1 from root.sg.d1", + "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", + "Asia", + "root"); + } + + @SuppressWarnings("unchecked") + @Test + public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + executor.schedule( + Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) + .thenReturn(Mockito.mock(ScheduledFuture.class)); + + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + CQManager cqManager = Mockito.mock(CQManager.class); + ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(configManager.getCQManager()).thenReturn(cqManager); + Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), Mockito.anyString())) + .thenReturn(true); + + TCreateCQReq req = newCreateCQReq(); + CreateCQProcedure procedure = new CreateCQProcedure(req, executor); + + CQInfo cqInfo = new CQInfo(); + cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), System.currentTimeMillis() + 10_000)); + Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); + + procedure.recoverScheduledTask(env); + + Mockito.verify(executor) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + CQManager cqManager = Mockito.mock(CQManager.class); + ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(configManager.getCQManager()).thenReturn(cqManager); + Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), Mockito.anyString())) + .thenReturn(false); + + TCreateCQReq req = newCreateCQReq(); + CreateCQProcedure procedure = new CreateCQProcedure(req, executor); + + CQInfo cqInfo = new CQInfo(); + cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), System.currentTimeMillis() + 10_000)); + Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); + + procedure.recoverScheduledTask(env); + + Mockito.verify(executor, Mockito.never()) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } +} From 6f7a3373b3bcf2e21907574e03eba89cc841a6ef Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 21 May 2026 11:30:00 +0800 Subject: [PATCH 2/3] sp --- .../confignode/procedure/impl/cq/CreateCQProcedure.java | 3 +-- .../org/apache/iotdb/confignode/persistence/CQInfoTest.java | 6 ++---- .../procedure/impl/cq/CreateCQProcedureRecoveryTest.java | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index 18b9d99ee18c1..d93170461b2b7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -188,8 +188,7 @@ void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager())); } - Optional getCurrentCQEntry(ConfigNodeProcedureEnv env) - throws ConsensusException { + Optional getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException { ShowCQResp response = (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan()); return response.getCqList().stream() diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index 0978d0bba9bc0..a66e7e6be26ef 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -125,14 +125,12 @@ public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { Assert.assertEquals( TSStatusCode.NO_SUCH_CQ.getStatusCode(), - cqInfo - .updateCQLastExecutionTime( + cqInfo.updateCQLastExecutionTime( new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldMd5")) .code); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), - cqInfo - .updateCQLastExecutionTime( + cqInfo.updateCQLastExecutionTime( new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newMd5")) .code); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java index ca68f6f3a892e..e5bcf50fea915 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java @@ -19,11 +19,11 @@ package org.apache.iotdb.confignode.procedure.impl.cq; -import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; From f09c3b01d5a32e4750fcdd1fbf3c04e3c2f6cf1b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:25:58 +0800 Subject: [PATCH 3/3] Fix CQ local schedule cancellation --- .../confignode/i18n/ConfigNodeMessages.java | 4 +- .../confignode/i18n/ConfigNodeMessages.java | 4 +- .../consensus/request/read/cq/ShowCQPlan.java | 13 +++ .../request/write/cq/ActiveCQPlan.java | 20 ++-- .../consensus/request/write/cq/AddCQPlan.java | 20 ++-- .../request/write/cq/DropCQPlan.java | 20 ++-- .../write/cq/UpdateCQLastExecTimePlan.java | 23 ++-- .../confignode/manager/cq/CQManager.java | 98 ++++++++++++---- .../confignode/manager/cq/CQScheduleTask.java | 58 ++++++++-- .../confignode/persistence/cq/CQInfo.java | 76 ++++++++----- .../executor/ConfigPlanExecutor.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 39 +++---- .../request/ConfigPhysicalPlanSerDeTest.java | 8 +- .../iotdb/confignode/cq/CQManagerTest.java | 107 ++++++++++++++++++ .../confignode/persistence/CQInfoTest.java | 37 +++++- .../procedure/impl/CreateCQProcedureTest.java | 2 +- .../cq/CreateCQProcedureRecoveryTest.java | 13 ++- 17 files changed, 407 insertions(+), 138 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 3a662932b61bd..2c18544ea6d5e 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -130,8 +130,8 @@ public final class ConfigNodeMessages { public static final String DOES_NOT_EXIST = "%s does not exist"; public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED = "Dropping tag or time column is not supported."; - public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH = - "Drop CQ {} failed, because its MD5 doesn't match."; + public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH = + "Drop CQ {} failed, because its token doesn't match."; public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST = "Drop CQ {} failed, because it doesn't exist."; public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully."; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index b08332bb35c7e..f7b889f25743e 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -126,8 +126,8 @@ public final class ConfigNodeMessages { "Deserialization error for write plan, request: {}, bytebuffer: {}"; public static final String DOES_NOT_EXIST = "%s does not exist"; public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED = "不支持删除标签列或时间列。"; - public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH = - "Drop CQ {} failed, because its MD5 doesn't match."; + public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH = + "Drop CQ {} failed, because its token doesn't match."; public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST = "Drop CQ {} failed, because it doesn't exist."; public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully."; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java index 5217849deb488..c28838d556b48 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java @@ -21,11 +21,24 @@ import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; +import java.util.Optional; + import static org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ; public class ShowCQPlan extends ConfigPhysicalReadPlan { + private final String cqId; + public ShowCQPlan() { + this(null); + } + + public ShowCQPlan(String cqId) { super(SHOW_CQ); + this.cqId = cqId; + } + + public Optional getCqId() { + return Optional.ofNullable(cqId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java index 263aeb9f0d064..3faa1c2d62f2e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java @@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { private String cqId; - private String md5; + private String cqToken; public ActiveCQPlan() { super(ACTIVE_CQ); } - public ActiveCQPlan(String cqId, String md5) { + public ActiveCQPlan(String cqId, String cqToken) { super(ACTIVE_CQ); Validate.notNull(cqId); - Validate.notNull(md5); + Validate.notNull(cqToken); this.cqId = cqId; - this.md5 = md5; + this.cqToken = cqToken; } public String getCqId() { return cqId; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); } @Override @@ -82,11 +82,11 @@ public boolean equals(Object o) { return false; } ActiveCQPlan that = (ActiveCQPlan) o; - return cqId.equals(that.cqId) && md5.equals(that.md5); + return cqId.equals(that.cqId) && cqToken.equals(that.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, md5); + return Objects.hash(super.hashCode(), cqId, cqToken); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java index 62f994688b30d..471516c38e563 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java @@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan { private TCreateCQReq req; - private String md5; + private String cqToken; private long firstExecutionTime; @@ -45,12 +45,12 @@ public AddCQPlan() { super(ADD_CQ); } - public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) { + public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) { super(ADD_CQ); Validate.notNull(req); - Validate.notNull(md5); + Validate.notNull(cqToken); this.req = req; - this.md5 = md5; + this.cqToken = cqToken; this.firstExecutionTime = firstExecutionTime; } @@ -58,8 +58,8 @@ public TCreateCQReq getReq() { return req; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } public long getFirstExecutionTime() { @@ -70,14 +70,14 @@ public long getFirstExecutionTime() { protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); firstExecutionTime = ReadWriteIOUtils.readLong(buffer); } @@ -95,11 +95,11 @@ public boolean equals(Object o) { AddCQPlan addCQPlan = (AddCQPlan) o; return firstExecutionTime == addCQPlan.firstExecutionTime && Objects.equals(req, addCQPlan.req) - && Objects.equals(md5, addCQPlan.md5); + && Objects.equals(cqToken, addCQPlan.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), req, md5, firstExecutionTime); + return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java index 108241b233d50..69c29bff63402 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java @@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan { private String cqId; // may be null in user call of drop CQ - private String md5; + private String cqToken; public DropCQPlan() { super(DROP_CQ); @@ -49,33 +49,33 @@ public DropCQPlan(String cqId) { this.cqId = cqId; } - public DropCQPlan(String cqId, String md5) { + public DropCQPlan(String cqId, String cqToken) { super(DROP_CQ); Validate.notNull(cqId); - Validate.notNull(md5); + Validate.notNull(cqToken); this.cqId = cqId; - this.md5 = md5; + this.cqToken = cqToken; } public String getCqId() { return cqId; } - public Optional getMd5() { - return Optional.ofNullable(md5); + public Optional getCqToken() { + return Optional.ofNullable(cqToken); } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); } @Override @@ -90,11 +90,11 @@ public boolean equals(Object o) { return false; } DropCQPlan that = (DropCQPlan) o; - return cqId.equals(that.cqId) && Objects.equals(md5, that.md5); + return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, md5); + return Objects.hash(super.hashCode(), cqId, cqToken); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java index 861a7d4f51b20..a487ae648e3fa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java @@ -37,20 +37,19 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { private long executionTime; - // may be null in user call of drop CQ - private String md5; + private String cqToken; public UpdateCQLastExecTimePlan() { super(UPDATE_CQ_LAST_EXEC_TIME); } - public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) { + public UpdateCQLastExecTimePlan(String cqId, long executionTime, String cqToken) { super(UPDATE_CQ_LAST_EXEC_TIME); Validate.notNull(cqId); - Validate.notNull(md5); + Validate.notNull(cqToken); this.cqId = cqId; this.executionTime = executionTime; - this.md5 = md5; + this.cqToken = cqToken; } public String getCqId() { @@ -61,8 +60,8 @@ public long getExecutionTime() { return executionTime; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } @Override @@ -70,14 +69,14 @@ protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); ReadWriteIOUtils.write(executionTime, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); executionTime = ReadWriteIOUtils.readLong(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); } @Override @@ -92,11 +91,13 @@ public boolean equals(Object o) { return false; } UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o; - return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5); + return executionTime == that.executionTime + && cqId.equals(that.cqId) + && cqToken.equals(that.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, executionTime, md5); + return Objects.hash(super.hashCode(), cqId, executionTime, cqToken); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index faad86239909f..29837bc8aa29f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -60,7 +60,8 @@ public class CQManager { private final ReadWriteLock lock; - private final ConcurrentMap locallyScheduledCQs; + // Key: CQ id. Value: the local task and the metadata token it owns. + private final ConcurrentMap locallyScheduledCQs; private ScheduledExecutorService executor; @@ -84,10 +85,11 @@ public TSStatus createCQ(TCreateCQReq req) { } public TSStatus dropCQ(TDropCQReq req) { + lock.readLock().lock(); try { TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - locallyScheduledCQs.remove(req.cqId); + cancelLocallyScheduledCQ(req.cqId); } return status; } catch (ConsensusException e) { @@ -96,6 +98,8 @@ public TSStatus dropCQ(TDropCQReq req) { TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); res.setMessage(e.getMessage()); return res; + } finally { + lock.readLock().unlock(); } } @@ -128,6 +132,7 @@ public void startCQScheduler() { try { // 1. shutdown previous cq schedule thread pool try { + cancelAllLocallyScheduledCQs(); if (executor != null) { executor.shutdown(); } @@ -142,7 +147,6 @@ public void startCQScheduler() { executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); - locallyScheduledCQs.clear(); // 3. get all CQs List allCQs = null; @@ -166,14 +170,14 @@ public void startCQScheduler() { if (allCQs != null) { for (CQInfo.CQEntry entry : allCQs) { if (entry.getState() == CQState.ACTIVE) { - if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) { + CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); + if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(), cqScheduleTask)) { continue; } - CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); try { cqScheduleTask.submitSelf(); } catch (RuntimeException e) { - unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5()); + unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken()); throw e; } } @@ -195,7 +199,7 @@ public void stopCQScheduler() { try { previous = executor; executor = null; - locallyScheduledCQs.clear(); + cancelAllLocallyScheduledCQs(); } finally { lock.writeLock().unlock(); } @@ -204,21 +208,77 @@ public void stopCQScheduler() { } } - public boolean markCQLocallyScheduled(String cqId, String md5) { + public boolean markCQLocallyScheduled(String cqId, String cqToken, CQScheduleTask task) { AtomicBoolean shouldSchedule = new AtomicBoolean(false); - locallyScheduledCQs.compute( - cqId, - (ignored, previousMd5) -> { - if (!md5.equals(previousMd5)) { + LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task); + lock.readLock().lock(); + try { + locallyScheduledCQs.compute( + cqId, + (ignored, previousSchedule) -> { + if (previousSchedule != null && previousSchedule.hasToken(cqToken)) { + return previousSchedule; + } + if (previousSchedule != null) { + previousSchedule.cancel(); + } shouldSchedule.set(true); - return md5; - } - return previousMd5; - }); - return shouldSchedule.get(); + return schedule; + }); + if (!shouldSchedule.get()) { + task.cancel(); + } + return shouldSchedule.get(); + } finally { + lock.readLock().unlock(); + } + } + + public void unmarkCQLocallyScheduled(String cqId, String cqToken) { + lock.readLock().lock(); + try { + locallyScheduledCQs.computeIfPresent( + cqId, + (ignored, schedule) -> { + if (schedule.hasToken(cqToken)) { + schedule.cancel(); + return null; + } + return schedule; + }); + } finally { + lock.readLock().unlock(); + } } - public void unmarkCQLocallyScheduled(String cqId, String md5) { - locallyScheduledCQs.remove(cqId, md5); + private void cancelLocallyScheduledCQ(String cqId) { + LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId); + if (schedule != null) { + schedule.cancel(); + } + } + + private void cancelAllLocallyScheduledCQs() { + locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel); + locallyScheduledCQs.clear(); + } + + private static class LocallyScheduledCQ { + + private final String cqToken; + private final CQScheduleTask task; + + private LocallyScheduledCQ(String cqToken, CQScheduleTask task) { + this.cqToken = cqToken; + this.task = task; + } + + private boolean hasToken(String cqToken) { + return this.cqToken.equals(cqToken); + } + + private void cancel() { + task.cancel(); + } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index c58f5ade9bcd0..6b73ffca95fd5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -40,7 +40,10 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class CQScheduleTask implements Runnable { @@ -70,7 +73,7 @@ public class CQScheduleTask implements Runnable { private final long endTimeOffset; private final TimeoutPolicy timeoutPolicy; private final String queryBody; - private final String md5; + private final String cqToken; private final String zoneId; @@ -82,12 +85,15 @@ public class CQScheduleTask implements Runnable { private final long retryWaitTimeInMS; + private final AtomicBoolean cancelled; + private final AtomicReference> scheduledFuture; + private long executionTime; public CQScheduleTask( TCreateCQReq req, long firstExecutionTime, - String md5, + String cqToken, ScheduledExecutorService executor, ConfigManager configManager) { this( @@ -97,7 +103,7 @@ public CQScheduleTask( req.endTimeOffset, TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, - md5, + cqToken, req.zoneId, req.username, executor, @@ -114,7 +120,7 @@ public CQScheduleTask( entry.getEndTimeOffset(), entry.getTimeoutPolicy(), entry.getQueryBody(), - entry.getMd5(), + entry.getCqToken(), entry.getZoneId(), entry.getUsername(), executor, @@ -130,7 +136,7 @@ public CQScheduleTask( long endTimeOffset, TimeoutPolicy timeoutPolicy, String queryBody, - String md5, + String cqToken, String zoneId, String username, ScheduledExecutorService executor, @@ -142,12 +148,14 @@ public CQScheduleTask( this.endTimeOffset = endTimeOffset; this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; - this.md5 = md5; + this.cqToken = cqToken; this.zoneId = zoneId; this.username = username; this.executor = executor; this.configManager = configManager; this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval / FACTOR); + this.cancelled = new AtomicBoolean(false); + this.scheduledFuture = new AtomicReference<>(); this.executionTime = executionTime; } @@ -166,6 +174,9 @@ public static long getFirstExecutionTime(long boundaryTime, long everyInterval, @Override public void run() { + if (cancelled.get()) { + return; + } long startTime = executionTime - startTimeOffset; long endTime = executionTime - endTimeOffset; @@ -178,6 +189,9 @@ public void run() { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); } } else { + if (cancelled.get()) { + return; + } LOGGER.info( ManagerMessages.STARTEXECUTECQ_EXECUTE_CQ_ON_DATANODE_TIME_RANGE_IS_CURRENT_TIME, cqId, @@ -207,12 +221,32 @@ public void submitSelf() { } private void submitSelf(long delay, TimeUnit unit) { - executor.schedule(this, delay, unit); + if (cancelled.get()) { + return; + } + ScheduledFuture newFuture = executor.schedule(this, delay, unit); + ScheduledFuture previousFuture = scheduledFuture.getAndSet(newFuture); + if (previousFuture != null) { + previousFuture.cancel(false); + } + if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) { + newFuture.cancel(false); + } + } + + public void cancel() { + cancelled.set(true); + ScheduledFuture currentFuture = scheduledFuture.getAndSet(null); + if (currentFuture != null) { + currentFuture.cancel(false); + } } private boolean needSubmit() { // current node is still leader and thread pool is not shut down. - return configManager.getConsensusManager().isLeader() && !executor.isShutdown(); + return !cancelled.get() + && configManager.getConsensusManager().isLeader() + && !executor.isShutdown(); } private class AsyncExecuteCQCallback implements AsyncMethodCallback { @@ -239,6 +273,9 @@ private void updateExecutionTime() { @Override public void onComplete(TSStatus response) { + if (cancelled.get()) { + return; + } if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( @@ -252,7 +289,7 @@ public void onComplete(TSStatus response) { result = configManager .getConsensusManager() - .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5)); + .write(new UpdateCQLastExecTimePlan(cqId, executionTime, cqToken)); } catch (ConsensusException e) { result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); result.setMessage(e.getMessage()); @@ -291,6 +328,9 @@ public void onComplete(TSStatus response) { @Override public void onError(Exception exception) { + if (cancelled.get()) { + return; + } LOGGER.warn(ManagerMessages.EXECUTE_CQ_FAILED, cqId, exception); if (needSubmit()) { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java index 9c99cfbb0e8bc..013e2415f9445 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.cq.CQState; import org.apache.iotdb.commons.cq.TimeoutPolicy; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; @@ -45,7 +46,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -62,7 +65,7 @@ public class CQInfo implements SnapshotProcessor { private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist."; - private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't match"; + private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s doesn't match"; private final Map cqMap; @@ -92,7 +95,7 @@ public TSStatus addCQ(AddCQPlan plan) { CQEntry cqEntry = new CQEntry( plan.getReq(), - plan.getMd5(), + plan.getCqToken(), plan.getFirstExecutionTime() - plan.getReq().everyInterval); cqMap.put(cqId, cqEntry); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -106,13 +109,13 @@ public TSStatus addCQ(AddCQPlan plan) { /** * Drop the CQ whose ID is same as cqId in plan. * - * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as cqId in plan, + * @return SUCCESS_STATUS if there is CQ whose ID and token is same as cqId in plan, * otherwise NO_SUCH_CQ. */ public TSStatus dropCQ(DropCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - Optional md5 = plan.getMd5(); + Optional cqToken = plan.getCqToken(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); @@ -120,10 +123,10 @@ public TSStatus dropCQ(DropCQPlan plan) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST, cqId); - } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) { + } else if ((cqToken.isPresent() && !cqToken.get().equals(cqEntry.cqToken))) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); - LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH, cqId); + res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); + LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH, cqId); } else { cqMap.remove(cqId); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -136,11 +139,24 @@ public TSStatus dropCQ(DropCQPlan plan) { } public ShowCQResp showCQ() { + return showCQ(new ShowCQPlan()); + } + + public ShowCQResp showCQ(ShowCQPlan plan) { lock.readLock().lock(); try { - return new ShowCQResp( - new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList())); + Optional cqId = plan.getCqId(); + List cqList; + if (cqId.isPresent()) { + CQEntry cqEntry = cqMap.get(cqId.get()); + cqList = + cqEntry == null + ? Collections.emptyList() + : Collections.singletonList(new CQEntry(cqEntry)); + } else { + cqList = cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()); + } + return new ShowCQResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList); } finally { lock.readLock().unlock(); } @@ -154,16 +170,16 @@ public ShowCQResp showCQ() { public TSStatus activeCQ(ActiveCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String md5 = plan.getMd5(); + String cqToken = plan.getCqToken(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!md5.equals(cqEntry.md5)) { + } else if (!cqToken.equals(cqEntry.cqToken)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); + res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.state == CQState.ACTIVE) { res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode(); res.message = String.format("CQ %s has already been active", cqId); @@ -181,22 +197,22 @@ public TSStatus activeCQ(ActiveCQPlan plan) { * Update the last execution time of the corresponding CQ. * * @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the CQ doesn't exist; or 2. - * md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= + * token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= * current lastExecutionTime; */ public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String md5 = plan.getMd5(); + String cqToken = plan.getCqToken(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!md5.equals(cqEntry.md5)) { + } else if (!cqToken.equals(cqEntry.cqToken)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); + res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) { res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode(); res.message = @@ -300,7 +316,7 @@ public static class CQEntry { private final TimeoutPolicy timeoutPolicy; private final String queryBody; private final String sql; - private final String md5; + private final String cqToken; private final String zoneId; @@ -309,7 +325,7 @@ public static class CQEntry { private CQState state; private long lastExecutionTime; - private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) { + private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) { this( req.cqId, req.everyInterval, @@ -319,7 +335,7 @@ private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) { TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, req.sql, - md5, + cqToken, req.zoneId, req.username, CQState.INACTIVE, @@ -336,7 +352,7 @@ private CQEntry(CQEntry other) { other.timeoutPolicy, other.queryBody, other.sql, - other.md5, + other.cqToken, other.zoneId, other.username, other.state, @@ -353,7 +369,7 @@ private CQEntry( TimeoutPolicy timeoutPolicy, String queryBody, String sql, - String md5, + String cqToken, String zoneId, String username, CQState state, @@ -366,7 +382,7 @@ private CQEntry( this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; this.sql = sql; - this.md5 = md5; + this.cqToken = cqToken; this.zoneId = zoneId; this.username = username; this.state = state; @@ -382,7 +398,7 @@ private void serialize(OutputStream stream) throws IOException { ReadWriteIOUtils.write(timeoutPolicy.getType(), stream); ReadWriteIOUtils.write(queryBody, stream); ReadWriteIOUtils.write(sql, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); ReadWriteIOUtils.write(zoneId, stream); ReadWriteIOUtils.write(username, stream); ReadWriteIOUtils.write(state.getType(), stream); @@ -398,7 +414,7 @@ private static CQEntry deserialize(InputStream stream) throws IOException { TimeoutPolicy timeoutPolicy = TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream)); String queryBody = ReadWriteIOUtils.readString(stream); String sql = ReadWriteIOUtils.readString(stream); - String md5 = ReadWriteIOUtils.readString(stream); + String cqToken = ReadWriteIOUtils.readString(stream); String zoneId = ReadWriteIOUtils.readString(stream); String username = ReadWriteIOUtils.readString(stream); CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream)); @@ -412,7 +428,7 @@ private static CQEntry deserialize(InputStream stream) throws IOException { timeoutPolicy, queryBody, sql, - md5, + cqToken, zoneId, username, state, @@ -451,8 +467,8 @@ public String getSql() { return sql; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } public CQState getState() { @@ -489,7 +505,7 @@ public boolean equals(Object o) { && timeoutPolicy == cqEntry.timeoutPolicy && Objects.equals(queryBody, cqEntry.queryBody) && Objects.equals(sql, cqEntry.sql) - && Objects.equals(md5, cqEntry.md5) + && Objects.equals(cqToken, cqEntry.cqToken) && Objects.equals(zoneId, cqEntry.zoneId) && Objects.equals(username, cqEntry.username) && state == cqEntry.state; @@ -506,7 +522,7 @@ public int hashCode() { timeoutPolicy, queryBody, sql, - md5, + cqToken, zoneId, username, state, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index eb8d5e5538b39..96dcdea464837 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; @@ -361,7 +362,7 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req) case GetRegionGroupsByTime: return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan) req); case SHOW_CQ: - return cqInfo.showCQ(); + return cqInfo.showCQ((ShowCQPlan) req); case ShowExternalService: return externalServiceInfo.showService(((ShowExternalServicePlan) req).getDataNodeIds()); case GetFunctionTable: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index d93170461b2b7..490f723d2e6b0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -40,7 +40,6 @@ import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.external.commons.codec.digest.DigestUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +65,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure { private TCreateCQReq req; - private String md5; + private String cqToken; private long firstExecutionTime; @@ -81,7 +80,7 @@ public CreateCQProcedure(ScheduledExecutorService executor) { public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) { super(); this.req = req; - this.md5 = generateCQToken(req.cqId); + this.cqToken = generateCQToken(); this.executor = executor; this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval); @@ -99,7 +98,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateCQState state) case INACTIVE: submitScheduleTask( env, - new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager())); + new CQScheduleTask( + req, firstExecutionTime, cqToken, executor, env.getConfigManager())); setNextState(SCHEDULED); break; case SCHEDULED: @@ -135,7 +135,7 @@ private void addCQ(ConfigNodeProcedureEnv env) { res = env.getConfigManager() .getConsensusManager() - .write(new AddCQPlan(req, md5, firstExecutionTime)); + .write(new AddCQPlan(req, cqToken, firstExecutionTime)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -156,7 +156,7 @@ private void addCQ(ConfigNodeProcedureEnv env) { private void activeCQ(ConfigNodeProcedureEnv env) { TSStatus res; try { - res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); + res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, cqToken)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -190,25 +190,25 @@ void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException Optional getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException { ShowCQResp response = - (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan()); + (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan(req.cqId)); return response.getCqList().stream() - .filter(entry -> req.cqId.equals(entry.getCqId()) && md5.equals(entry.getMd5())) + .filter(entry -> cqToken.equals(entry.getCqToken())) .findFirst(); } - private static String generateCQToken(String cqId) { - return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID()); + private static String generateCQToken() { + return UUID.randomUUID().toString(); } private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) { CQManager cqManager = env.getConfigManager().getCQManager(); - if (!cqManager.markCQLocallyScheduled(req.cqId, md5)) { + if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) { return; } try { cqScheduleTask.submitSelf(); } catch (RuntimeException e) { - cqManager.unmarkCQLocallyScheduled(req.cqId, md5); + cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken); throw e; } } @@ -225,7 +225,8 @@ protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) LOGGER.info(ProcedureMessages.START_INACTIVE_ROLLBACK_OF_CQ, req.cqId); TSStatus res; try { - res = env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, md5)); + res = + env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, cqToken)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -276,7 +277,7 @@ public void serialize(DataOutputStream stream) throws IOException { stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode()); super.serialize(stream); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @@ -284,7 +285,7 @@ public void serialize(DataOutputStream stream) throws IOException { public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer); - this.md5 = ReadWriteIOUtils.readString(byteBuffer); + this.cqToken = ReadWriteIOUtils.readString(byteBuffer); this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer); } @@ -303,7 +304,7 @@ && getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime && Objects.equals(req, that.req) - && Objects.equals(md5, that.md5); + && Objects.equals(cqToken, that.cqToken); } @Override @@ -314,7 +315,7 @@ public int hashCode() { getCycles(), isGeneratedByPipe, req, - md5, + cqToken, firstExecutionTime); } @@ -322,7 +323,7 @@ public String getCqId() { return req == null ? null : req.getCqId(); } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index d6a3d7e3fd737..ea35be6c5d7e1 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -1674,7 +1674,7 @@ public void UpdateTriggerStateInTablePlanTest() throws IOException { @Test public void ActiveCQPlanTest() throws IOException { - ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5"); + ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken"); ActiveCQPlan activeCQPlan1 = (ActiveCQPlan) ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer()); @@ -1697,7 +1697,7 @@ public void AddCQPlanTest() throws IOException { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1_md5", + "testCq1Token", executionTime); AddCQPlan addCQPlan1 = (AddCQPlan) ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer()); @@ -1712,7 +1712,7 @@ public void DropCQPlanTest() throws IOException { (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); - dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5"); + dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token"); dropCQPlan1 = (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); @@ -1721,7 +1721,7 @@ public void DropCQPlanTest() throws IOException { @Test public void UpdateCQLastExecTimePlanTest() throws IOException { UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 = - new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCq_md5"); + new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCqToken"); UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 = (UpdateCQLastExecTimePlan) ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java new file mode 100644 index 0000000000000..a0bc5a523ba70 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.confignode.cq; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cq.TimeoutPolicy; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.cq.CQManager; +import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class CQManagerTest { + + @SuppressWarnings("unchecked") + @Test + public void dropCQShouldCancelLocallyScheduledTask() throws Exception { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.write(Mockito.any())) + .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + CQManager cqManager = new CQManager(configManager); + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + CQScheduleTask task = newScheduledTask(configManager, future, "token"); + + try { + assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task)); + task.submitSelf(); + cqManager.dropCQ(new TDropCQReq("testCq")); + + Mockito.verify(future).cancel(false); + } finally { + cqManager.stopCQScheduler(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void newTokenShouldCancelPreviousLocallyScheduledTask() { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + CQManager cqManager = new CQManager(configManager); + ScheduledFuture previousFuture = Mockito.mock(ScheduledFuture.class); + CQScheduleTask previousTask = newScheduledTask(configManager, previousFuture, "previousToken"); + ScheduledFuture currentFuture = Mockito.mock(ScheduledFuture.class); + CQScheduleTask currentTask = newScheduledTask(configManager, currentFuture, "currentToken"); + + try { + assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken", previousTask)); + previousTask.submitSelf(); + assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken", currentTask)); + + Mockito.verify(previousFuture).cancel(false); + } finally { + cqManager.stopCQScheduler(); + } + } + + @SuppressWarnings("unchecked") + private CQScheduleTask newScheduledTask( + ConfigManager configManager, ScheduledFuture scheduledFuture, String cqToken) { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + executor.schedule( + Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) + .thenReturn((ScheduledFuture) scheduledFuture); + return new CQScheduleTask( + "testCq", + 1000, + 0, + 1000, + TimeoutPolicy.BLOCKED, + "select s1 into root.backup.d1.s1 from root.sg.d1", + cqToken, + "Asia", + "root", + executor, + configManager, + System.currentTimeMillis() + 10_000); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index a66e7e6be26ef..64bbd69c5b6a7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -18,9 +18,11 @@ */ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; +import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -73,7 +75,7 @@ public void testSnapshot() throws TException, IOException { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1_md5", + "testCq1Token", executionTime); cqInfo.addCQ(addCQPlan); @@ -92,7 +94,7 @@ public void testSnapshot() throws TException, IOException { "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END", "Asia", "root"), - "testCq2_md5", + "testCq2Token", executionTime); cqInfo.addCQ(addCQPlan); @@ -119,19 +121,42 @@ public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { "Asia", "root"); - cqInfo.addCQ(new AddCQPlan(req, "oldMd5", executionTime)); + cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime)); cqInfo.dropCQ(new DropCQPlan("testCq3")); - cqInfo.addCQ(new AddCQPlan(req, "newMd5", executionTime)); + cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime)); Assert.assertEquals( TSStatusCode.NO_SUCH_CQ.getStatusCode(), cqInfo.updateCQLastExecutionTime( - new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldMd5")) + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldToken")) .code); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), cqInfo.updateCQLastExecutionTime( - new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newMd5")) + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newToken")) .code); } + + @Test + public void testShowCQCanFilterByCQId() throws Exception { + long executionTime = System.currentTimeMillis(); + TCreateCQReq req = + new TCreateCQReq( + "testCq4", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d4.s1 from root.sg.d4", + "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from root.sg.d4 END", + "Asia", + "root"); + cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime)); + + ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4")); + + Assert.assertEquals(1, showCQResp.getCqList().size()); + Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId()); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java index 4e088f578ad4a..3e7fd2052ad53 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java @@ -60,7 +60,7 @@ public void tokenShouldBeUniqueForSameCQId() { CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor); CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor); - assertNotEquals(createCQProcedure1.getMd5(), createCQProcedure2.getMd5()); + assertNotEquals(createCQProcedure1.getCqToken(), createCQProcedure2.getCqToken()); } finally { executor.shutdown(); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java index e5bcf50fea915..a90e282494f0e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.cq.CQManager; +import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; @@ -67,14 +68,16 @@ public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws Except Mockito.when(env.getConfigManager()).thenReturn(configManager); Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); Mockito.when(configManager.getCQManager()).thenReturn(cqManager); - Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), Mockito.anyString())) + Mockito.when( + cqManager.markCQLocallyScheduled( + Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) .thenReturn(true); TCreateCQReq req = newCreateCQReq(); CreateCQProcedure procedure = new CreateCQProcedure(req, executor); CQInfo cqInfo = new CQInfo(); - cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), System.currentTimeMillis() + 10_000)); + cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); procedure.recoverScheduledTask(env); @@ -94,14 +97,16 @@ public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws Excep Mockito.when(env.getConfigManager()).thenReturn(configManager); Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); Mockito.when(configManager.getCQManager()).thenReturn(cqManager); - Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), Mockito.anyString())) + Mockito.when( + cqManager.markCQLocallyScheduled( + Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) .thenReturn(false); TCreateCQReq req = newCreateCQReq(); CreateCQProcedure procedure = new CreateCQProcedure(req, executor); CQInfo cqInfo = new CQInfo(); - cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), System.currentTimeMillis() + 10_000)); + cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); procedure.recoverScheduledTask(env);