-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fixed TTL problems #17735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fixed TTL problems #17735
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,10 @@ | |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; | ||
| import org.apache.iotdb.common.rpc.thrift.TSStatus; | ||
| import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; | ||
| import org.apache.iotdb.commons.conf.IoTDBConstant; | ||
| import org.apache.iotdb.commons.exception.IoTDBException; | ||
| import org.apache.iotdb.commons.exception.MetadataException; | ||
| import org.apache.iotdb.commons.schema.ttl.TTLCache; | ||
| import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; | ||
| import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; | ||
| import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; | ||
|
|
@@ -47,14 +49,21 @@ | |
| import java.io.DataOutputStream; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
| public class SetTTLProcedure extends StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class); | ||
| // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback. | ||
| private static final long TTL_NOT_EXIST = Long.MIN_VALUE; | ||
| private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2; | ||
|
|
||
| private SetTTLPlan plan; | ||
| private long previousTTL = TTL_NOT_EXIST; | ||
|
Check warning on line 64 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| private long previousDatabaseWildcardTTL = TTL_NOT_EXIST; | ||
|
Check warning on line 65 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| private boolean previousTTLStateCaptured = false; | ||
|
Check warning on line 66 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
|
|
||
| public SetTTLProcedure(final boolean isGeneratedByPipe) { | ||
| super(isGeneratedByPipe); | ||
|
|
@@ -71,6 +80,10 @@ | |
| long startTime = System.currentTimeMillis(); | ||
| try { | ||
| switch (state) { | ||
| case CAPTURE_PREVIOUS_TTL: | ||
| capturePreviousTTLState(env); | ||
| setNextState(SetTTLState.SET_CONFIGNODE_TTL); | ||
| return Flow.HAS_MORE_STATE; | ||
| case SET_CONFIGNODE_TTL: | ||
| setConfigNodeTTL(env); | ||
| return Flow.HAS_MORE_STATE; | ||
|
|
@@ -86,18 +99,13 @@ | |
| } | ||
| } | ||
|
|
||
| private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { | ||
| TSStatus res; | ||
| try { | ||
| res = | ||
| env.getConfigManager() | ||
| .getConsensusManager() | ||
| .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : this.plan); | ||
| } catch (ConsensusException e) { | ||
| LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); | ||
| res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); | ||
| res.setMessage(e.getMessage()); | ||
| void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { | ||
|
Check warning on line 102 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| if (!previousTTLStateCaptured) { | ||
| capturePreviousTTLState(env); | ||
| setNextState(SetTTLState.SET_CONFIGNODE_TTL); | ||
| return; | ||
| } | ||
| final TSStatus res = writeConfigNodePlan(env, plan); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (medium)
Suggestion: split capture into its own pre-state (e.g. |
||
| if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message); | ||
| setFailure(new ProcedureException(new IoTDBException(res))); | ||
|
|
@@ -106,35 +114,177 @@ | |
| } | ||
| } | ||
|
|
||
| private void updateDataNodeTTL(ConfigNodeProcedureEnv env) { | ||
| Map<Integer, TDataNodeLocation> dataNodeLocationMap = | ||
| void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { | ||
|
Check warning on line 117 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap = | ||
| env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); | ||
| DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| new DataNodeAsyncRequestContext<>( | ||
| CnToDnAsyncRequestType.SET_TTL, | ||
| new TSetTTLReq( | ||
| Collections.singletonList(String.join(".", plan.getPathPattern())), | ||
| plan.getTTL(), | ||
| plan.isDataBase()), | ||
| dataNodeLocationMap); | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| sendTTLRequest( | ||
| dataNodeLocationMap, | ||
| buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), plan.isDataBase())); | ||
| if (hasFailedDataNode(clientHandler)) { | ||
| LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); | ||
| setFailure( | ||
| new ProcedureException( | ||
| new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); | ||
| } | ||
| } | ||
|
|
||
| private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) { | ||
|
Check warning on line 132 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| if (previousTTLStateCaptured) { | ||
| return; | ||
| } | ||
| previousTTL = getTTLOrDefault(env, plan.getPathPattern()); | ||
| if (plan.isDataBase()) { | ||
| previousDatabaseWildcardTTL = | ||
| getTTLOrDefault(env, getDatabaseWildcardPathPattern(plan.getPathPattern())); | ||
| } | ||
| previousTTLStateCaptured = true; | ||
| } | ||
|
|
||
| TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { | ||
|
Check warning on line 144 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| try { | ||
| return env.getConfigManager() | ||
| .getConsensusManager() | ||
| .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : setTTLPlan); | ||
| } catch (ConsensusException e) { | ||
| LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); | ||
| final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); | ||
| res.setMessage(e.getMessage()); | ||
| return res; | ||
| } | ||
| } | ||
|
|
||
| DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest( | ||
|
Check warning on line 157 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final TSetTTLReq req) { | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, dataNodeLocationMap); | ||
| CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); | ||
| Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap(); | ||
| for (TSStatus status : statusMap.values()) { | ||
| // all dataNodes must clear the related schemaengine cache | ||
| return clientHandler; | ||
| } | ||
|
|
||
| private TSetTTLReq buildSetTTLReq( | ||
|
Check warning on line 165 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final String[] pathPattern, final long ttl, final boolean isDataBase) { | ||
| return new TSetTTLReq( | ||
| Collections.singletonList(String.join(".", pathPattern)), ttl, isDataBase); | ||
| } | ||
|
|
||
| private boolean hasFailedDataNode( | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) { | ||
| if (!clientHandler.getRequestIndices().isEmpty()) { | ||
| return true; | ||
| } | ||
| for (TSStatus status : clientHandler.getResponseMap().values()) { | ||
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); | ||
| setFailure( | ||
| new ProcedureException( | ||
| new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); | ||
| return; | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final String[] pathPattern) { | ||
|
Check warning on line 184 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final long ttl = env.getConfigManager().getTTLManager().getTTL(pathPattern); | ||
| return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl; | ||
| } | ||
|
|
||
| private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) { | ||
| final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 1); | ||
| pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; | ||
| return pathNodes; | ||
| } | ||
|
|
||
| private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { | ||
|
Check warning on line 195 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL); | ||
| if (plan.isDataBase()) { | ||
| restoreTTLOnConfigNode( | ||
| env, getDatabaseWildcardPathPattern(plan.getPathPattern()), previousDatabaseWildcardTTL); | ||
| } | ||
| } | ||
|
|
||
| private void restoreTTLOnConfigNode( | ||
|
Check warning on line 203 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final ConfigNodeProcedureEnv env, final String[] pathPattern, final long ttl) | ||
| throws ProcedureException { | ||
| // TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the executor to unset it. | ||
| final SetTTLPlan rollbackPlan = | ||
| new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| // Database rollback restores the database path and db.** separately, so avoid auto-expansion. | ||
| rollbackPlan.setDataBase(false); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explicit |
||
| final TSStatus status = writeConfigNodePlan(env, rollbackPlan); | ||
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| throw new ProcedureException( | ||
| new MetadataException( | ||
| "Rollback ConfigNode ttl failed for " | ||
| + String.join(".", pathPattern) | ||
| + ": " | ||
| + status.getMessage())); | ||
| } | ||
| } | ||
|
|
||
| private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { | ||
|
Check warning on line 222 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap = | ||
| env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); | ||
| restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), previousTTL); | ||
| if (plan.isDataBase()) { | ||
| restoreTTLOnDataNodes( | ||
| dataNodeLocationMap, | ||
| getDatabaseWildcardPathPattern(plan.getPathPattern()), | ||
| previousDatabaseWildcardTTL); | ||
| } | ||
| } | ||
|
|
||
| private void restoreTTLOnDataNodes( | ||
|
Check warning on line 234 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap, | ||
| final String[] pathPattern, | ||
| final long ttl) | ||
| throws ProcedureException { | ||
| if (dataNodeLocationMap.isEmpty()) { | ||
| return; | ||
| } | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| sendTTLRequest( | ||
| dataNodeLocationMap, | ||
| buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl, false)); | ||
| if (hasFailedDataNode(clientHandler)) { | ||
| throw new ProcedureException( | ||
| new MetadataException( | ||
| "Rollback dataNode ttl cache failed for " + String.join(".", pathPattern))); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Best-effort rollback: restore both sides, throw the earliest failure, and suppress later ones. | ||
| */ | ||
| @Override | ||
| protected void rollbackState( | ||
| ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState) | ||
| throws IOException, InterruptedException, ProcedureException {} | ||
| protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState setTTLState) | ||
|
Check warning on line 257 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
|
||
| throws IOException, InterruptedException, ProcedureException { | ||
| if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || !previousTTLStateCaptured) { | ||
| return; | ||
| } | ||
| ProcedureException rollbackFailure = null; | ||
| try { | ||
| rollbackConfigNodeTTL(env); | ||
| } catch (ProcedureException e) { | ||
| LOGGER.error("Failed to rollback ConfigNode ttl state.", e); | ||
| rollbackFailure = e; | ||
| } | ||
| try { | ||
| rollbackDataNodeTTL(env); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two suggestions on the rollback error-aggregation strategy:
|
||
| } catch (ProcedureException e) { | ||
| LOGGER.error("Failed to rollback DataNode ttl cache.", e); | ||
| if (rollbackFailure == null) { | ||
| rollbackFailure = e; | ||
| } else { | ||
| rollbackFailure.addSuppressed(e); | ||
| } | ||
| } | ||
| if (rollbackFailure != null) { | ||
| throw rollbackFailure; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean isRollbackSupported(final SetTTLState state) { | ||
| return state == SetTTLState.UPDATE_DATANODE_CACHE; | ||
| } | ||
|
|
||
| @Override | ||
| protected SetTTLState getState(int stateId) { | ||
|
|
@@ -148,7 +298,7 @@ | |
|
|
||
| @Override | ||
| protected SetTTLState getInitialState() { | ||
| return SetTTLState.SET_CONFIGNODE_TTL; | ||
| return SetTTLState.CAPTURE_PREVIOUS_TTL; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -159,14 +309,25 @@ | |
| : ProcedureType.SET_TTL_PROCEDURE.getTypeCode()); | ||
| super.serialize(stream); | ||
| ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream); | ||
| stream.writeBoolean(previousTTLStateCaptured); | ||
| stream.writeLong(previousTTL); | ||
| stream.writeLong(previousDatabaseWildcardTTL); | ||
| } | ||
|
|
||
| @Override | ||
| public void deserialize(ByteBuffer byteBuffer) { | ||
| super.deserialize(byteBuffer); | ||
| try { | ||
| ReadWriteIOUtils.readInt(byteBuffer); | ||
| final int length = ReadWriteIOUtils.readInt(byteBuffer); | ||
| final int position = byteBuffer.position(); | ||
| this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer); | ||
| // The serialized plan buffer may include padding; skip to the actual payload end. | ||
| byteBuffer.position(position + length); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Reason: |
||
| if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { | ||
| this.previousTTLStateCaptured = byteBuffer.get() != 0; | ||
| this.previousTTL = byteBuffer.getLong(); | ||
| this.previousDatabaseWildcardTTL = byteBuffer.getLong(); | ||
| } | ||
| } catch (IOException e) { | ||
| LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e); | ||
| } | ||
|
|
@@ -180,12 +341,21 @@ | |
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| return this.plan.equals(((SetTTLProcedure) o).plan) | ||
| && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe); | ||
| final SetTTLProcedure that = (SetTTLProcedure) o; | ||
| return this.isGeneratedByPipe == that.isGeneratedByPipe | ||
| && this.previousTTL == that.previousTTL | ||
| && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL | ||
| && this.previousTTLStateCaptured == that.previousTTLStateCaptured | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Including |
||
| && this.plan.equals(that.plan); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(plan, isGeneratedByPipe); | ||
| return Objects.hash( | ||
| plan, | ||
| isGeneratedByPipe, | ||
| previousTTL, | ||
| previousDatabaseWildcardTTL, | ||
| previousTTLStateCaptured); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth a one-line comment explaining why this sentinel is
Long.MIN_VALUErather thanTTLCache.NULL_TTL(-1). They have different meanings on the rollback path:TTL_NOT_EXISTmeans "no TTL was set before this procedure", whileNULL_TTLis the explicit "unset" marker thatConfigPlanExecutorinterprets to route tounsetTTL. Conflating the two would corrupt rollback behavior.