Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions pkg/frontend/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,12 +1430,16 @@ func restoreToDatabaseOrTableWithPitr(
return
}

// Skip master check for TABLE/DATABASE level restore:
// 1. User explicitly specifies which table(s) to restore
// 2. If the table is referenced by other tables restored earlier, master check would incorrectly block
if err = reCreateTableWithPitr(ctx,
sid,
bh,
pitrName,
ts,
tblInfo); err != nil {
tblInfo,
true); err != nil {
return
}
}
Expand All @@ -1456,15 +1460,21 @@ func reCreateTableWithPitr(
bh BackgroundExec,
pitrName string,
ts int64,
tblInfo *tableInfo) (err error) {
tblInfo *tableInfo,
skipMasterCheck bool) (err error) {
getLogger(sid).Info(fmt.Sprintf("[%s] start to restore table: '%v' at timestamp %d", pitrName, tblInfo.tblName, ts))

var isMasterTable bool
isMasterTable, err = checkTableIsMaster(ctx, sid, bh, pitrName, tblInfo.dbName, tblInfo.tblName)
if isMasterTable {
// skip restore the table which is master table
getLogger(sid).Info(fmt.Sprintf("[%s] skip restore master table: %v.%v", pitrName, tblInfo.dbName, tblInfo.tblName))
return
if !skipMasterCheck {
var isMasterTable bool
isMasterTable, err = checkTableIsMaster(ctx, sid, bh, pitrName, tblInfo.dbName, tblInfo.tblName)
if err != nil {
return
}
if isMasterTable {
// skip restore the table which is master table
getLogger(sid).Info(fmt.Sprintf("[%s] skip restore master table: %v.%v", pitrName, tblInfo.dbName, tblInfo.tblName))
return
}
}

if err = bh.Exec(ctx, fmt.Sprintf("use `%s`", tblInfo.dbName)); err != nil {
Expand All @@ -1488,11 +1498,17 @@ func reCreateTableWithPitr(
}
}

// Use a context with IgnoreForeignKey=true for clone operations.
// This skips FK validation during restore, as FK tables are already restored
// in topological order by restoreTablesWithFkByPitr(). Without this, Resolve() can
// fail with ExpectedEOB when the database was drop+recreated in the same transaction.
cloneCtx := context.WithValue(ctx, defines.IgnoreForeignKey{}, true)

// insert data
insertIntoSql := fmt.Sprintf(restoreTableDataByTsFmt, tblInfo.dbName, tblInfo.tblName, tblInfo.dbName, tblInfo.tblName, ts)
beginTime := time.Now()
getLogger(sid).Info(fmt.Sprintf("[%s] start to insert select table: '%v', insert sql: %s", pitrName, tblInfo.tblName, insertIntoSql))
if err = bh.Exec(ctx, insertIntoSql); err != nil {
if err = bh.Exec(cloneCtx, insertIntoSql); err != nil {
return
}
getLogger(sid).Info(fmt.Sprintf("[%s] insert select table: %v, cost: %v", pitrName, tblInfo.tblName, time.Since(beginTime)))
Expand Down Expand Up @@ -1696,7 +1712,7 @@ func restoreTablesWithFkByPitr(
// e.g. t1.pk <- t2.fk, we only want to restore t2, fkTableMap[t1.key] is nil, ignore t1
if tblInfo := fkTableMap[key]; tblInfo != nil {
getLogger(sid).Info(fmt.Sprintf("[%s] start to restore table with fk: %v, restore timestamp: %d", pitrName, tblInfo.tblName, ts))
if err = reCreateTableWithPitr(ctx, sid, bh, pitrName, ts, tblInfo); err != nil {
if err = reCreateTableWithPitr(ctx, sid, bh, pitrName, ts, tblInfo, true); err != nil {
return
}
}
Expand Down Expand Up @@ -1827,7 +1843,8 @@ func restoreSystemDatabaseWithPitr(
return
}

if err = reCreateTableWithPitr(ctx, sid, bh, pitrName, ts, tblInfo); err != nil {
// Skip master check for system tables - they are restored as part of the system database
if err = reCreateTableWithPitr(ctx, sid, bh, pitrName, ts, tblInfo, true); err != nil {
return
}
}
Expand Down
247 changes: 246 additions & 1 deletion pkg/frontend/pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package frontend
import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"strings"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -3445,3 +3446,247 @@ func Test_getPitrLengthAndUnit(t *testing.T) {
_, _, _, err = getPitrLengthAndUnit(ctx, bh, "table", "", "", "tbl")
assert.Error(t, err)
}

// ──────────────────────────────────────────────────────────────────────────────
// Tests for reCreateTableWithPitr skipMasterCheck and restoreTablesWithFkByPitr
// ──────────────────────────────────────────────────────────────────────────────

func Test_reCreateTableWithPitr_SkipMasterCheck(t *testing.T) {
convey.Convey("reCreateTableWithPitr with skipMasterCheck=true should always restore table", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// Register a result for checkTableIsMaster that would return true.
// With skipMasterCheck=true, this should be ignored.
masterCheckSql := fmt.Sprintf(checkTableIsMasterFormat, "testdb", "parent_tbl")
bh.sql2result[masterCheckSql] = newMrsForCheckMaster([][]interface{}{{"testdb", "child_tbl"}})

tblInfo := &tableInfo{
dbName: "testdb",
tblName: "parent_tbl",
}

err := reCreateTableWithPitr(ctx, "", bh, "pitr01", 100, tblInfo, true)
convey.So(err, convey.ShouldBeNil)

// Verify that the master check SQL was NOT executed
convey.So(bh.hasExecuted(masterCheckSql), convey.ShouldBeFalse)

// Verify that use/drop/clone SQLs WERE executed
convey.So(bh.hasExecuted("use `testdb`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `parent_tbl`"), convey.ShouldBeTrue)

// The clone SQL should have been executed
cloneExecuted := false
for _, sql := range bh.executedSqls {
if strings.Contains(sql, "clone") && strings.Contains(sql, "parent_tbl") {
cloneExecuted = true
break
}
}
convey.So(cloneExecuted, convey.ShouldBeTrue)
})
}

func Test_reCreateTableWithPitr_MasterTableSkipped(t *testing.T) {
convey.Convey("reCreateTableWithPitr with skipMasterCheck=false should skip master tables", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// Register a result that makes checkTableIsMaster return true
masterCheckSql := fmt.Sprintf(checkTableIsMasterFormat, "testdb", "parent_tbl")
bh.sql2result[masterCheckSql] = newMrsForCheckMaster([][]interface{}{{"testdb", "child_tbl"}})

tblInfo := &tableInfo{
dbName: "testdb",
tblName: "parent_tbl",
}

err := reCreateTableWithPitr(ctx, "", bh, "pitr01", 100, tblInfo, false)
convey.So(err, convey.ShouldBeNil)

// Verify master check WAS executed
convey.So(bh.hasExecuted(masterCheckSql), convey.ShouldBeTrue)

// Verify that use/drop/clone were NOT executed (table was skipped)
convey.So(bh.hasExecuted("use `testdb`"), convey.ShouldBeFalse)
convey.So(bh.hasExecuted("drop table if exists `parent_tbl`"), convey.ShouldBeFalse)
})
}

func Test_reCreateTableWithPitr_NonMasterProceeds(t *testing.T) {
convey.Convey("reCreateTableWithPitr with skipMasterCheck=false should proceed for non-master tables", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// Register an empty result → checkTableIsMaster returns false
masterCheckSql := fmt.Sprintf(checkTableIsMasterFormat, "testdb", "leaf_tbl")
bh.sql2result[masterCheckSql] = newMrsForCheckMaster([][]interface{}{})

tblInfo := &tableInfo{
dbName: "testdb",
tblName: "leaf_tbl",
}

err := reCreateTableWithPitr(ctx, "", bh, "pitr01", 100, tblInfo, false)
convey.So(err, convey.ShouldBeNil)

// Master check was executed
convey.So(bh.hasExecuted(masterCheckSql), convey.ShouldBeTrue)

// And the restore proceeded
convey.So(bh.hasExecuted("use `testdb`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `leaf_tbl`"), convey.ShouldBeTrue)
})
}

func Test_reCreateTableWithPitr_MasterCheckError(t *testing.T) {
convey.Convey("reCreateTableWithPitr propagates checkTableIsMaster errors", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// Do NOT register a result for the master check SQL.
// getResultSet will fail when it encounters nil.
tblInfo := &tableInfo{
dbName: "testdb",
tblName: "some_tbl",
}

err := reCreateTableWithPitr(ctx, "", bh, "pitr01", 100, tblInfo, false)
convey.So(err, convey.ShouldNotBeNil)
convey.So(err.Error(), convey.ShouldContainSubstring, "not the type of result set")

// The restore should NOT have proceeded
convey.So(bh.hasExecuted("use `testdb`"), convey.ShouldBeFalse)
})
}

func Test_restoreTablesWithFkByPitr_AlwaysRestoresParentTables(t *testing.T) {
convey.Convey("restoreTablesWithFkByPitr should always restore parent tables (skipMasterCheck=true)", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// Setup: pri01 is a parent table that checkTableIsMaster would report as master.
// In the old code, this would cause pri01 to be skipped, breaking child FK validation.
masterCheckPri := fmt.Sprintf(checkTableIsMasterFormat, "acc_test02", "pri01")
bh.sql2result[masterCheckPri] = newMrsForCheckMaster([][]interface{}{{"acc_test02", "aff01"}})

masterCheckAff := fmt.Sprintf(checkTableIsMasterFormat, "acc_test02", "aff01")
bh.sql2result[masterCheckAff] = newMrsForCheckMaster([][]interface{}{})

// Topo-sorted: parent first, then child
sortedFkTbls := []string{
genKey("acc_test02", "pri01"),
genKey("acc_test02", "aff01"),
}

fkTableMap := map[string]*tableInfo{
genKey("acc_test02", "pri01"): {dbName: "acc_test02", tblName: "pri01"},
genKey("acc_test02", "aff01"): {dbName: "acc_test02", tblName: "aff01"},
}

err := restoreTablesWithFkByPitr(ctx, "", bh, "pitr01", 100, sortedFkTbls, fkTableMap)
convey.So(err, convey.ShouldBeNil)

// Both tables should have been restored
convey.So(bh.hasExecuted("use `acc_test02`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `pri01`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `aff01`"), convey.ShouldBeTrue)

// Master check should NOT have been executed (skipMasterCheck=true inside restoreTablesWithFkByPitr)
convey.So(bh.hasExecuted(masterCheckPri), convey.ShouldBeFalse)
convey.So(bh.hasExecuted(masterCheckAff), convey.ShouldBeFalse)
})
}

func Test_restoreTablesWithFkByPitr_SkipsNilEntries(t *testing.T) {
convey.Convey("restoreTablesWithFkByPitr skips tables not in fkTableMap", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// t1.pk <- t2.fk, but we only restore t2. t1 is nil in fkTableMap.
sortedFkTbls := []string{
genKey("db1", "t1"),
genKey("db1", "t2"),
}

fkTableMap := map[string]*tableInfo{
genKey("db1", "t2"): {dbName: "db1", tblName: "t2"},
// t1 not in map → should be skipped
}

err := restoreTablesWithFkByPitr(ctx, "", bh, "pitr01", 100, sortedFkTbls, fkTableMap)
convey.So(err, convey.ShouldBeNil)

// t2 should be restored, t1 should not
convey.So(bh.hasExecuted("drop table if exists `t2`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `t1`"), convey.ShouldBeFalse)
})
}

func Test_restoreTablesWithFkByPitr_EmptyList(t *testing.T) {
convey.Convey("restoreTablesWithFkByPitr with empty list succeeds", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

err := restoreTablesWithFkByPitr(ctx, "", bh, "pitr01", 100, nil, nil)
convey.So(err, convey.ShouldBeNil)
convey.So(len(bh.executedSqls), convey.ShouldEqual, 0)
})
}

func Test_restoreTablesWithFkByPitr_MultipleParentChain(t *testing.T) {
convey.Convey("restoreTablesWithFkByPitr handles multi-level FK chain", t, func() {
ctx, _, cleanup := setupTestCtx(t)
defer cleanup()

bh := &backgroundExecTestWithHistory{}
bh.init()

// grandparent -> parent -> child: all should be restored without master check
sortedFkTbls := []string{
genKey("db1", "grandparent"),
genKey("db1", "parent"),
genKey("db1", "child"),
}

fkTableMap := map[string]*tableInfo{
genKey("db1", "grandparent"): {dbName: "db1", tblName: "grandparent"},
genKey("db1", "parent"): {dbName: "db1", tblName: "parent"},
genKey("db1", "child"): {dbName: "db1", tblName: "child"},
}

err := restoreTablesWithFkByPitr(ctx, "", bh, "pitr01", 100, sortedFkTbls, fkTableMap)
convey.So(err, convey.ShouldBeNil)

convey.So(bh.hasExecuted("drop table if exists `grandparent`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `parent`"), convey.ShouldBeTrue)
convey.So(bh.hasExecuted("drop table if exists `child`"), convey.ShouldBeTrue)

// No master check queries should have been executed
for _, sql := range bh.executedSqls {
convey.So(strings.Contains(sql, "mo_foreign_keys"), convey.ShouldBeFalse)
}
})
}
Loading
Loading