diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index c4c1e8aede97f..faad86239909f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -43,7 +43,10 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,11 +60,14 @@ public class CQManager { private final ReadWriteLock lock; + private final ConcurrentMap locallyScheduledCQs; + private ScheduledExecutorService executor; public CQManager(ConfigManager configManager) { this.configManager = configManager; this.lock = new ReentrantReadWriteLock(); + this.locallyScheduledCQs = new ConcurrentHashMap<>(); this.executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); @@ -79,7 +85,11 @@ public TSStatus createCQ(TCreateCQReq req) { public TSStatus dropCQ(TDropCQReq req) { try { - return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + locallyScheduledCQs.remove(req.cqId); + } + return status; } catch (ConsensusException e) { LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, req.cqId, e); // consensus layer related errors @@ -132,6 +142,7 @@ public void startCQScheduler() { executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); + locallyScheduledCQs.clear(); // 3. get all CQs List allCQs = null; @@ -155,8 +166,16 @@ public void startCQScheduler() { if (allCQs != null) { for (CQInfo.CQEntry entry : allCQs) { if (entry.getState() == CQState.ACTIVE) { + if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) { + continue; + } CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); - cqScheduleTask.submitSelf(); + try { + cqScheduleTask.submitSelf(); + } catch (RuntimeException e) { + unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5()); + throw e; + } } } } @@ -176,6 +195,7 @@ public void stopCQScheduler() { try { previous = executor; executor = null; + locallyScheduledCQs.clear(); } finally { lock.writeLock().unlock(); } @@ -183,4 +203,22 @@ public void stopCQScheduler() { previous.shutdown(); } } + + public boolean markCQLocallyScheduled(String cqId, String md5) { + AtomicBoolean shouldSchedule = new AtomicBoolean(false); + locallyScheduledCQs.compute( + cqId, + (ignored, previousMd5) -> { + if (!md5.equals(previousMd5)) { + shouldSchedule.set(true); + return md5; + } + return previousMd5; + }); + return shouldSchedule.get(); + } + + public void unmarkCQLocallyScheduled(String cqId, String md5) { + locallyScheduledCQs.remove(cqId, md5); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index ac964d23ca311..d93170461b2b7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -22,11 +22,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; +import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.i18n.ProcedureMessages; +import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; +import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; @@ -45,6 +49,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE; @@ -75,7 +81,7 @@ public CreateCQProcedure(ScheduledExecutorService executor) { public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) { super(); this.req = req; - this.md5 = DigestUtils.md2Hex(req.cqId); + this.md5 = generateCQToken(req.cqId); this.executor = executor; this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval); @@ -91,12 +97,15 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateCQState state) addCQ(env); return Flow.HAS_MORE_STATE; case INACTIVE: - CQScheduleTask cqScheduleTask = - new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()); - cqScheduleTask.submitSelf(); + submitScheduleTask( + env, + new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager())); setNextState(SCHEDULED); break; case SCHEDULED: + if (isStateDeserialized()) { + recoverScheduledTask(env); + } activeCQ(env); return Flow.NO_MORE_STATE; default: @@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) { } } + void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException { + Optional cqEntry = getCurrentCQEntry(env); + if (!cqEntry.isPresent()) { + LOGGER.info( + "Skip recovering the schedule task of CQ {} because its metadata is unavailable.", + req.cqId); + return; + } + submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager())); + } + + Optional getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException { + ShowCQResp response = + (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan()); + return response.getCqList().stream() + .filter(entry -> req.cqId.equals(entry.getCqId()) && md5.equals(entry.getMd5())) + .findFirst(); + } + + private static String generateCQToken(String cqId) { + return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID()); + } + + private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) { + CQManager cqManager = env.getConfigManager().getCQManager(); + if (!cqManager.markCQLocallyScheduled(req.cqId, md5)) { + return; + } + try { + cqScheduleTask.submitSelf(); + } catch (RuntimeException e) { + cqManager.unmarkCQLocallyScheduled(req.cqId, md5); + throw e; + } + } + @Override protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) throws IOException, InterruptedException, ProcedureException { @@ -272,4 +317,12 @@ public int hashCode() { md5, firstExecutionTime); } + + public String getCqId() { + return req == null ? null : req.getCqId(); + } + + public String getMd5() { + return md5; + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index 4b409d6cf0cd2..a66e7e6be26ef 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -19,8 +19,11 @@ package org.apache.iotdb.confignode.persistence; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.apache.tsfile.external.commons.io.FileUtils; @@ -99,4 +102,36 @@ public void testSnapshot() throws TException, IOException { Assert.assertEquals(cqInfo, actualCQInfo); } + + @Test + public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { + long executionTime = System.currentTimeMillis(); + TCreateCQReq req = + new TCreateCQReq( + "testCq3", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d3.s1 from root.sg.d3", + "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from root.sg.d3 END", + "Asia", + "root"); + + cqInfo.addCQ(new AddCQPlan(req, "oldMd5", executionTime)); + cqInfo.dropCQ(new DropCQPlan("testCq3")); + cqInfo.addCQ(new AddCQPlan(req, "newMd5", executionTime)); + + Assert.assertEquals( + TSStatusCode.NO_SUCH_CQ.getStatusCode(), + cqInfo.updateCQLastExecutionTime( + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldMd5")) + .code); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + cqInfo.updateCQLastExecutionTime( + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newMd5")) + .code); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java index d0e92b3281666..4e088f578ad4a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java @@ -36,10 +36,36 @@ import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; public class CreateCQProcedureTest { + @Test + public void tokenShouldBeUniqueForSameCQId() { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + try { + TCreateCQReq req = + new TCreateCQReq( + "testCq1", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d1(s1) from root.sg.d1", + "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END", + "Asia", + "root"); + CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor); + CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor); + + assertNotEquals(createCQProcedure1.getMd5(), createCQProcedure2.getMd5()); + } finally { + executor.shutdown(); + } + } + @Test public void serializeDeserializeTest() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java new file mode 100644 index 0000000000000..e5bcf50fea915 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.cq; + +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.cq.CQManager; +import org.apache.iotdb.confignode.persistence.cq.CQInfo; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class CreateCQProcedureRecoveryTest { + + private TCreateCQReq newCreateCQReq() { + return new TCreateCQReq( + "testCq1", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d1.s1 from root.sg.d1", + "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", + "Asia", + "root"); + } + + @SuppressWarnings("unchecked") + @Test + public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + executor.schedule( + Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) + .thenReturn(Mockito.mock(ScheduledFuture.class)); + + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + CQManager cqManager = Mockito.mock(CQManager.class); + ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(configManager.getCQManager()).thenReturn(cqManager); + Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), Mockito.anyString())) + .thenReturn(true); + + TCreateCQReq req = newCreateCQReq(); + CreateCQProcedure procedure = new CreateCQProcedure(req, executor); + + CQInfo cqInfo = new CQInfo(); + cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), System.currentTimeMillis() + 10_000)); + Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); + + procedure.recoverScheduledTask(env); + + Mockito.verify(executor) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + CQManager cqManager = Mockito.mock(CQManager.class); + ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(configManager.getCQManager()).thenReturn(cqManager); + Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), Mockito.anyString())) + .thenReturn(false); + + TCreateCQReq req = newCreateCQReq(); + CreateCQProcedure procedure = new CreateCQProcedure(req, executor); + + CQInfo cqInfo = new CQInfo(); + cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), System.currentTimeMillis() + 10_000)); + Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); + + procedure.recoverScheduledTask(env); + + Mockito.verify(executor, Mockito.never()) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } +}