Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -57,11 +60,14 @@

private final ReadWriteLock lock;

private final ConcurrentMap<String, String> locallyScheduledCQs;

private ScheduledExecutorService executor;

public CQManager(ConfigManager configManager) {
this.configManager = configManager;
this.lock = new ReentrantReadWriteLock();
this.locallyScheduledCQs = new ConcurrentHashMap<>();
this.executor =
IoTDBThreadPoolFactory.newScheduledThreadPool(
CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
Expand All @@ -79,7 +85,11 @@

public TSStatus dropCQ(TDropCQReq req) {
try {
return configManager.getConsensusManager().write(new DropCQPlan(req.cqId));
TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId));
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
locallyScheduledCQs.remove(req.cqId);
}
return status;
} catch (ConsensusException e) {
LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, req.cqId, e);
// consensus layer related errors
Expand Down Expand Up @@ -113,7 +123,7 @@
return res;
}

public void startCQScheduler() {

Check failure on line 126 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5InLAGIQnTQqJg_n2S&open=AZ5InLAGIQnTQqJg_n2S&pullRequest=17734
lock.writeLock().lock();
try {
// 1. shutdown previous cq schedule thread pool
Expand All @@ -132,6 +142,7 @@
executor =
IoTDBThreadPoolFactory.newScheduledThreadPool(
CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
locallyScheduledCQs.clear();

// 3. get all CQs
List<CQInfo.CQEntry> allCQs = null;
Expand All @@ -155,8 +166,16 @@
if (allCQs != null) {
for (CQInfo.CQEntry entry : allCQs) {
if (entry.getState() == CQState.ACTIVE) {
if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) {
continue;
}
CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager);
cqScheduleTask.submitSelf();
try {
cqScheduleTask.submitSelf();
} catch (RuntimeException e) {
unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5());
throw e;
}
}
}
}
Expand All @@ -176,11 +195,30 @@
try {
previous = executor;
executor = null;
locallyScheduledCQs.clear();
} finally {
lock.writeLock().unlock();
}
if (previous != null) {
previous.shutdown();
}
}

public boolean markCQLocallyScheduled(String cqId, String md5) {

Check warning on line 207 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'markCQLocallyScheduled' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5InLAGIQnTQqJg_n2T&open=AZ5InLAGIQnTQqJg_n2T&pullRequest=17734
AtomicBoolean shouldSchedule = new AtomicBoolean(false);
locallyScheduledCQs.compute(
cqId,
(ignored, previousMd5) -> {
if (!md5.equals(previousMd5)) {
shouldSchedule.set(true);
return md5;
}
return previousMd5;
});
return shouldSchedule.get();
}

public void unmarkCQLocallyScheduled(String cqId, String md5) {

Check warning on line 221 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'unmarkCQLocallyScheduled' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5InLAGIQnTQqJg_n2U&open=AZ5InLAGIQnTQqJg_n2U&pullRequest=17734
locallyScheduledCQs.remove(cqId, md5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
Expand All @@ -45,6 +49,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE;
Expand Down Expand Up @@ -75,7 +81,7 @@
public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) {
super();
this.req = req;
this.md5 = DigestUtils.md2Hex(req.cqId);
this.md5 = generateCQToken(req.cqId);
this.executor = executor;
this.firstExecutionTime =
CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval);
Expand All @@ -91,12 +97,15 @@
addCQ(env);
return Flow.HAS_MORE_STATE;
case INACTIVE:
CQScheduleTask cqScheduleTask =
new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager());
cqScheduleTask.submitSelf();
submitScheduleTask(
env,
new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()));
setNextState(SCHEDULED);
break;
case SCHEDULED:
if (isStateDeserialized()) {
recoverScheduledTask(env);
}
activeCQ(env);
return Flow.NO_MORE_STATE;
default:
Expand Down Expand Up @@ -168,6 +177,42 @@
}
}

void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException {
Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
if (!cqEntry.isPresent()) {
LOGGER.info(
"Skip recovering the schedule task of CQ {} because its metadata is unavailable.",
req.cqId);
return;
}
submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager()));
}

Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException {

Check warning on line 191 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'getCurrentCQEntry' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5InK_lIQnTQqJg_n2Q&open=AZ5InK_lIQnTQqJg_n2Q&pullRequest=17734
ShowCQResp response =
(ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan());
return response.getCqList().stream()
.filter(entry -> req.cqId.equals(entry.getCqId()) && md5.equals(entry.getMd5()))
.findFirst();
}

private static String generateCQToken(String cqId) {

Check warning on line 199 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'generateCQToken' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5InK_lIQnTQqJg_n2R&open=AZ5InK_lIQnTQqJg_n2R&pullRequest=17734
return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID());
}

private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) {
CQManager cqManager = env.getConfigManager().getCQManager();
if (!cqManager.markCQLocallyScheduled(req.cqId, md5)) {
return;
}
try {
cqScheduleTask.submitSelf();
} catch (RuntimeException e) {
cqManager.unmarkCQLocallyScheduled(req.cqId, md5);
throw e;
}
}

@Override
protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
throws IOException, InterruptedException, ProcedureException {
Expand Down Expand Up @@ -272,4 +317,12 @@
md5,
firstExecutionTime);
}

public String getCqId() {
return req == null ? null : req.getCqId();
}

public String getMd5() {
return md5;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.iotdb.confignode.persistence;

import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.apache.tsfile.external.commons.io.FileUtils;
Expand Down Expand Up @@ -99,4 +102,36 @@ public void testSnapshot() throws TException, IOException {

Assert.assertEquals(cqInfo, actualCQInfo);
}

@Test
public void testOldCallbackCannotTouchRecreatedCQ() throws Exception {
long executionTime = System.currentTimeMillis();
TCreateCQReq req =
new TCreateCQReq(
"testCq3",
1000,
0,
1000,
0,
(byte) 0,
"select s1 into root.backup.d3.s1 from root.sg.d3",
"create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from root.sg.d3 END",
"Asia",
"root");

cqInfo.addCQ(new AddCQPlan(req, "oldMd5", executionTime));
cqInfo.dropCQ(new DropCQPlan("testCq3"));
cqInfo.addCQ(new AddCQPlan(req, "newMd5", executionTime));

Assert.assertEquals(
TSStatusCode.NO_SUCH_CQ.getStatusCode(),
cqInfo.updateCQLastExecutionTime(
new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldMd5"))
.code);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
cqInfo.updateCQLastExecutionTime(
new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newMd5"))
.code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,36 @@
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;

public class CreateCQProcedureTest {

@Test
public void tokenShouldBeUniqueForSameCQId() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
try {
TCreateCQReq req =
new TCreateCQReq(
"testCq1",
1000,
0,
1000,
0,
(byte) 0,
"select s1 into root.backup.d1(s1) from root.sg.d1",
"create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END",
"Asia",
"root");
CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor);
CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor);

assertNotEquals(createCQProcedure1.getMd5(), createCQProcedure2.getMd5());
} finally {
executor.shutdown();
}
}

@Test
public void serializeDeserializeTest() {

Expand Down
Loading
Loading