Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: disable ttl job when recover/flashback table/database/cluster #40268

Merged
merged 8 commits into from
Jan 3, 2023
38 changes: 34 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
totalLockedRegionsOffset
startTSOffset
commitTSOffset
ttlJobEnableOffSet
)

func closePDSchedule() error {
Expand Down Expand Up @@ -124,6 +125,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func getTiDBTTLJobEnable(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBTTLJobEnable)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
}

func setTiDBTTLJobEnable(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBTTLJobEnable, value)
}

func setTiDBEnableAutoAnalyze(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBEnableAutoAnalyze, value)
}
Expand Down Expand Up @@ -176,6 +189,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err = setTiDBSuperReadOnly(d.ctx, sess, variable.On); err != nil {
return err
}
if err = setTiDBTTLJobEnable(d.ctx, sess, variable.Off); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have turned off the TTL_ENABLE for every table, is it still necessary to turn off this variable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for flashback cluster,we did not turn off any table's TTL_ENABLE and just set tidb_ttl_job_enable='OFF'

return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
Expand Down Expand Up @@ -553,9 +569,9 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -595,6 +611,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Trace(err)
}
job.Args[readOnlyOffset] = &readOnlyValue
ttlJobEnableValue, err = getTiDBTTLJobEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
Expand Down Expand Up @@ -694,10 +716,10 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabled bool

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
Copy link
Contributor

@Defined2014 Defined2014 Jan 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should store session variable in model.StateNone state, like L587. But one question here, what will happened when ttlEnabled during flashback?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
return errors.Trace(err)
}
sess, err := w.sessPool.get()
Expand All @@ -718,6 +740,14 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
if err = setTiDBSuperReadOnly(w.ctx, sess, readOnlyValue); err != nil {
return err
}

if job.IsCancelled() {
// only restore `tidb_ttl_job_enable` when flashback failed
if err = setTiDBTTLJobEnable(w.ctx, sess, ttlJobEnableValue); err != nil {
return err
}
}

return setTiDBEnableAutoAnalyze(w.ctx, sess, autoAnalyzeValue)
})
if err != nil {
Expand Down
24 changes: 22 additions & 2 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,16 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)
}
}
dom.DDL().SetHook(hook)
// first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off
// first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off and `tidb_ttl_job_enable` = on
tk.MustExec("set global tidb_gc_enable = on")
tk.MustExec("set global tidb_super_read_only = off")
tk.MustExec("set global tidb_ttl_job_enable = on")

tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

Expand All @@ -224,10 +228,14 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
rs, err = tk.Exec("show variables like 'tidb_gc_enable'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

// second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on
// second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on and `tidb_ttl_job_enable` = off
tk.MustExec("set global tidb_gc_enable = off")
tk.MustExec("set global tidb_super_read_only = on")
tk.MustExec("set global tidb_ttl_job_enable = off")

ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
Expand All @@ -238,6 +246,9 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
rs, err = tk.Exec("show variables like 'tidb_gc_enable'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
Expand Down Expand Up @@ -268,9 +279,14 @@ func TestCancelFlashbackCluster(t *testing.T) {
return job.SchemaState == model.StateDeleteOnly
})
dom.DDL().SetHook(hook)
tk.MustExec("set global tidb_ttl_job_enable = on")
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)

rs, err := tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)

// Try canceled on StateWriteReorganization, cancel failed
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteReorganization
Expand All @@ -279,6 +295,10 @@ func TestCancelFlashbackCluster(t *testing.T) {
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
hook.MustCancelFailed(t)

rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
assert.NoError(t, err)
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,8 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0, /* startTS */
0 /* commitTS */},
0, /* commitTS */
variable.On /* tidb_ttl_job_enable */},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down
4 changes: 4 additions & 0 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i
return ver, errors.Trace(err)
}
for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo {
if recoverInfo.TableInfo.TTLInfo != nil {
// force disable TTL job schedule for recovered table
recoverInfo.TableInfo.TTLInfo.Enable = false
}
ver, err = w.recoverTable(t, job, recoverInfo)
if err != nil {
return ver, errors.Trace(err)
Expand Down
64 changes: 64 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,70 @@ func TestCancelAddIndexPanic(t *testing.T) {
require.Truef(t, strings.HasPrefix(errMsg, "[ddl:8214]Cancelled DDL job"), "%v", errMsg)
}

func TestRecoverTableWithTTL(t *testing.T) {
store, _ := createMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database if not exists test_recover")
tk.MustExec("use test_recover")
defer func(originGC bool) {
if originGC {
util.EmulatorGCEnable()
} else {
util.EmulatorGCDisable()
}
}(util.IsEmulatorGCEnable())

// disable emulator GC.
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
util.EmulatorGCDisable()
gcTimeFormat := "20060102-15:04:05 -0700 MST"
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, time.Now().Add(-time.Hour).Format(gcTimeFormat)))
getDDLJobID := func(table, tp string) int64 {
rs, err := tk.Exec("admin show ddl jobs")
require.NoError(t, err)
rows, err := session.GetRows4Test(context.Background(), tk.Session(), rs)
require.NoError(t, err)
for _, row := range rows {
if row.GetString(2) == table && row.GetString(3) == tp {
return row.GetInt64(0)
}
}
require.FailNowf(t, "can't find %s table of %s", tp, table)
return -1
}

// recover table
tk.MustExec("create table t_recover1 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop table t_recover1")
tk.MustExec("recover table t_recover1")
tk.MustQuery("show create table t_recover1").Check(testkit.Rows("t_recover1 CREATE TABLE `t_recover1` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))

// recover table with job id
tk.MustExec("create table t_recover2 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop table t_recover2")
jobID := getDDLJobID("t_recover2", "drop table")
tk.MustExec(fmt.Sprintf("recover table BY JOB %d", jobID))
tk.MustQuery("show create table t_recover2").Check(testkit.Rows("t_recover2 CREATE TABLE `t_recover2` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))

// flashback table
tk.MustExec("create table t_recover3 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop table t_recover3")
tk.MustExec("flashback table t_recover3")
tk.MustQuery("show create table t_recover3").Check(testkit.Rows("t_recover3 CREATE TABLE `t_recover3` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))

// flashback database
tk.MustExec("create database if not exists test_recover2")
tk.MustExec("create table test_recover2.t1 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("create table test_recover2.t2 (t timestamp) TTL=`t`+INTERVAL 1 DAY")
tk.MustExec("drop database test_recover2")
tk.MustExec("flashback database test_recover2")
tk.MustQuery("show create table test_recover2.t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))
tk.MustQuery("show create table test_recover2.t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n `t` timestamp NULL DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![ttl] TTL=`t` + INTERVAL 1 DAY */ /*T![ttl] TTL_ENABLE='OFF' */"))
}

func TestRecoverTableByJobID(t *testing.T) {
store, _ := createMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

schemaID := recoverInfo.SchemaID
tblInfo := recoverInfo.TableInfo
if tblInfo.TTLInfo != nil {
// force disable TTL job schedule for recovered table
tblInfo.TTLInfo.Enable = false
}

// check GC and safe point
gcEnable, err := checkGCEnable(w)
if err != nil {
Expand Down