Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
fc69ae5
[feat](ivm) Add mtmv increment refresh framework
yujun777 Mar 17, 2026
276905d
[feature](mtmv) add nereids ivm rewrite gate and tests
yujun777 Mar 17, 2026
8608660
[feature](ivm) add refresh manager entry point
yujun777 Mar 17, 2026
aad748f
[ivm] make plan analysis an analysis artifact
yujun777 Mar 17, 2026
f1762e2
Add thin IVM stream metadata precheck
yujun777 Mar 17, 2026
0a98c8a
[ivm] fix IVMRefreshResult null contract, add toString and fallback l…
yujun777 Mar 17, 2026
aebc1b5
[ivm] replace DeltaPlanBundle description string with LogicalPlan and…
yujun777 Mar 18, 2026
c8cd645
[ivm] replace ivmPlanAnalysis with ivmDeltaBundles in CascadesContext
yujun777 Mar 18, 2026
0c21ef2
[ivm] replace plan analyzer/dispatcher with Nereids delta rules
yujun777 Mar 18, 2026
4224caf
[ivm] refactor IVM Nereids rules: normalize + delta rule skeletons
yujun777 Mar 18, 2026
2b8b5b6
[ivm] introduce IvmAnalyzeMode and split IVM session variables
yujun777 Mar 18, 2026
013aff3
[ivm] implement IvmNormalizeMtmvPlan row-id injection and IvmContext
yujun777 Mar 18, 2026
e6e9f7e
[ivm] wire row-id column into CreateMTMVInfo and add UTs
yujun777 Mar 18, 2026
3b2ba5c
[ivm] make IVM MV UNIQUE_KEYS+MOW and add key-type UTs
yujun777 Mar 18, 2026
a659c5e
Rename IVM delta bundle to command bundle
yujun777 Mar 18, 2026
5742631
[ivm] remove mv_ prefix from IVM row-id column name
yujun777 Mar 18, 2026
4f4d2e6
[ivm] move delta calculation outside Nereids and add skeleton IvmDelt…
yujun777 Mar 18, 2026
9d01066
[ivm] implement scan & project-scan delta rewriting in IvmDeltaRewriter
yujun777 Mar 18, 2026
0b613d7
[ivm] convert IVMDeltaExecutor and IVMCapabilityChecker from interfac…
yujun777 Mar 18, 2026
2bc5f00
[ivm] integrate IVM refresh into MTMVTask run flow
yujun777 Mar 18, 2026
ebe6a51
[ivm] extract MTMVPlanUtil.executeCommand() and add audit log to IVMD…
yujun777 Mar 19, 2026
886cd97
fix ivm refresh insert table command fail
yujun777 Mar 19, 2026
27ee2f1
[improvement](fe) Centralize IVM hidden column names
yujun777 Mar 19, 2026
53a35cb
[fix](fe) Remove unused MTMV imports
yujun777 Mar 19, 2026
160eb4d
[fix](fe) Preserve IVM row id in incremental mtmv refresh
yujun777 Mar 22, 2026
aa6d777
[fix](fe) Refresh root fragment output exprs
yujun777 Mar 22, 2026
00273d0
[fix](fe) Disable DML MV rewrite during MTMV refresh
yujun777 Mar 22, 2026
a14eee0
[fix](fe) Normalize incremental MTMV sink outputs
yujun777 Mar 24, 2026
a125346
[fix](fe) Fix redundant guard and checkstyle in incremental MTMV sink…
yujun777 Mar 24, 2026
cbd4468
[refactor](fe) Rename IVM normalize MTMV rule
yujun777 Mar 26, 2026
f663ffd
[test](fe) Update FE unit test expectations
yujun777 Mar 26, 2026
04f2cb9
[fix](fe) Reuse aggregate output alias slot for expression group keys
yujun777 Mar 26, 2026
0f881b0
[fix](fe) Check translated output slot refs
yujun777 Mar 26, 2026
e5fb20d
[fix](fe) Restore view alias rewrite guard for simpleColumnDefinitions
yujun777 Mar 26, 2026
77ed6d8
[fix](fe) Preserve deferred TopN projected outputs
yujun777 Mar 27, 2026
80519f3
[fix](fe) Limit MTMV alias rewrite to incremental refresh
yujun777 Mar 27, 2026
896e457
[fix](fe) Create MTMV context before StatementContext in refresh
yujun777 Mar 27, 2026
4907e61
[feature](fe) Add IVM aggregate normalization support
yujun777 Mar 31, 2026
8390c7d
[refactor](fe) Rename IVM-prefixed classes to Ivm in mtmv/ivm package
yujun777 Mar 31, 2026
eb207bc
[refactor](fe) Extract MTMV test cases from CreateTableCommandTest in…
yujun777 Mar 31, 2026
d5b926d
[refactor](fe) Split testConvertToPartitionTableInfo into four separa…
yujun777 Mar 31, 2026
7f2632e
[improvement](fe) Propagate IvmNormalizeResult through delta rewrite …
yujun777 Apr 2, 2026
3179006
[refactor](fe) Extract IvmDeltaStrategy interface and AbstractDeltaSt…
yujun777 Apr 2, 2026
bec9fc1
[test](fe) Add aggregate IMMV creation test cases in CreateMTMVComman…
yujun777 Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ refreshSchedule
;

refreshMethod
: COMPLETE | AUTO
: COMPLETE | AUTO | INCREMENTAL
;

mvPartition
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ public class Column implements GsonPostProcessable {
public static final String HIDDEN_COLUMN_PREFIX = "__DORIS_";
// all shadow indexes should have this prefix in name
public static final String SHADOW_NAME_PREFIX = "__doris_shadow_";
public static final String IVM_HIDDEN_COLUMN_PREFIX = "__DORIS_IVM_";
// NOTE: you should name hidden column start with '__DORIS_' !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String WHERE_SIGN = "__DORIS_WHERE_SIGN__";
public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
public static final String ROWID_COL = "__DORIS_ROWID_COL__";
public static final String GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
public static final String IVM_ROW_ID_COL = "__DORIS_IVM_ROW_ID_COL__";
public static final String IVM_AGG_COUNT_COL = "__DORIS_IVM_AGG_COUNT_COL__";
public static final String ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
public static final String VERSION_COL = "__DORIS_VERSION_COL__";
public static final String SKIP_BITMAP_COL = "__DORIS_SKIP_BITMAP_COL__";
Expand Down Expand Up @@ -217,6 +220,10 @@ public Column(String name, Type type, boolean isKey, AggregateType aggregateType
false, null, null, Sets.newHashSet(), null);
}

public static boolean isIvmHiddenColumn(String columnName) {
return StringUtils.startsWith(columnName, IVM_HIDDEN_COLUMN_PREFIX);
}

public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String defaultValue, String comment, boolean visible, int colUniqueId) {
this(name, type, isKey, aggregateType, isAllowNull, -1, defaultValue, comment, visible, null, colUniqueId, null,
Expand Down
25 changes: 25 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.ivm.IvmInfo;
import org.apache.doris.mtmv.ivm.IvmUtil;
import org.apache.doris.nereids.rules.analysis.SessionVarGuardRewriter;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
Expand All @@ -56,6 +59,7 @@
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -87,6 +91,8 @@ public class MTMV extends OlapTable {
private MTMVPartitionInfo mvPartitionInfo;
@SerializedName("rs")
private MTMVRefreshSnapshot refreshSnapshot;
@SerializedName("ii")
private IvmInfo ivmInfo;
// Should update after every fresh, not persist
// Cache with SessionVarGuardExpr: used when query session variables differ from MV creation variables
private MTMVCache cacheWithGuard;
Expand Down Expand Up @@ -120,6 +126,7 @@ public MTMV() {
this.mvPartitionInfo = params.mvPartitionInfo;
this.relation = params.relation;
this.refreshSnapshot = new MTMVRefreshSnapshot();
this.ivmInfo = new IvmInfo();
this.envInfo = new EnvInfo(-1L, -1L);
this.sessionVariables = params.sessionVariables;
mvRwLock = new ReentrantReadWriteLock(true);
Expand Down Expand Up @@ -437,6 +444,21 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
return refreshSnapshot;
}

public IvmInfo getIvmInfo() {
return ivmInfo;
}

public List<String> getInsertedColumnNames() {
List<Column> columns = getBaseSchema(true);
List<String> columnNames = Lists.newArrayListWithExpectedSize(columns.size());
for (Column column : columns) {
if (column.isVisible() || IvmUtil.isIvmHiddenColumn(column.getName())) {
columnNames.add(column.getName());
}
}
return columnNames;
}

public long getSchemaChangeVersion() {
readMvLock();
try {
Expand Down Expand Up @@ -609,6 +631,9 @@ private void compatibleInternal(CatalogMgr catalogMgr) throws Exception {
@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
if (ivmInfo == null) {
ivmInfo = new IvmInfo();
}
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = refreshSnapshot.getPartitionSnapshots();
compatiblePctSnapshot(partitionSnapshots);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,16 @@
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mtmv.ivm.IvmRefreshManager;
import org.apache.doris.mtmv.ivm.IvmRefreshResult;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -244,6 +242,21 @@ public void run() throws JobException {
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
}
// Attempt IVM refresh for incremental MVs and fall back when the plan is unsupported.
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.INCREMENTAL) {
IvmRefreshManager ivmRefreshManager = new IvmRefreshManager();
IvmRefreshResult ivmResult = ivmRefreshManager.doRefresh(mtmv);
if (ivmResult.isSuccess()) {
LOG.info("IVM incremental refresh succeeded for mv={}, taskId={}",
mtmv.getName(), getTaskId());
return;
}
LOG.warn("IVM refresh fell back for mv={}, reason={}, detail={}, taskId={}. "
+ "Continuing with partition-based refresh.",
mtmv.getName(), ivmResult.getFallbackReason(),
ivmResult.getDetailMessage(), getTaskId());
// TODO: it may cause too many full refresh, need limit full refresh here
}
Map<TableIf, String> tableWithPartKey = getIncrementalTableMap();
this.completedPartitions = Lists.newCopyOnWriteArrayList();
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
Expand Down Expand Up @@ -321,36 +334,23 @@ private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, Strin
private void exec(Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
throws Exception {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
// Create MTMV context first so that new StatementContext() captures the
// correct thread-local ConnectContext (with MTMV disabled rules, etc.).
ConnectContext mtmvCtx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
StatementContext statementContext = new StatementContext();
for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
statementContext.setSnapshot(entry.getKey(), entry.getValue());
}
ctx.setStatementContext(statementContext);
TUniqueId queryId = generateQueryId();
lastQueryId = DebugUtil.printId(queryId);
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE
? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey, statementContext);
try {
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
if (getStatus() == TaskStatus.CANCELED) {
// Throwing an exception to interrupt subsequent partition update tasks
throw new JobException("task is CANCELED");
}
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
} finally {
if (executor != null) {
AuditLogHelper.logAuditLog(ctx, getDummyStmt(refreshPartitionNames),
executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true);
}
boolean enableIvmNormalMTMVPlan = mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.INCREMENTAL;
executor = MTMVPlanUtil.executeCommand(mtmvCtx, command, statementContext,
getDummyStmt(refreshPartitionNames), enableIvmNormalMTMVPlan);
lastQueryId = DebugUtil.printId(executor.getContext().queryId());
if (getStatus() == TaskStatus.CANCELED) {
throw new JobException("task is CANCELED");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.mtmv;

import org.apache.doris.mtmv.ivm.IvmNormalizeResult;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;

import java.util.List;
Expand All @@ -25,6 +27,8 @@ public class MTMVAnalyzeQueryInfo {
private MTMVRelation relation;
private MTMVPartitionInfo mvPartitionInfo;
private List<ColumnDefinition> columnDefinitions;
// set when IVM normalization is enabled; carries normalizedPlan + aggMeta
private IvmNormalizeResult ivmNormalizeResult;

public MTMVAnalyzeQueryInfo(List<ColumnDefinition> columnDefinitions, MTMVPartitionInfo mvPartitionInfo,
MTMVRelation relation) {
Expand All @@ -44,4 +48,17 @@ public MTMVPartitionInfo getMvPartitionInfo() {
public MTMVRelation getRelation() {
return relation;
}

public IvmNormalizeResult getIvmNormalizeResult() {
return ivmNormalizeResult;
}

public void setIvmNormalizeResult(IvmNormalizeResult ivmNormalizeResult) {
this.ivmNormalizeResult = ivmNormalizeResult;
}

/** Convenience accessor — returns the normalized plan, or null if IVM is not active. */
public Plan getIvmNormalizedPlan() {
return ivmNormalizeResult != null ? ivmNormalizeResult.getNormalizedPlan() : null;
}
}
Loading
Loading