diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index c8db15977a50a..2e5240474586a 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -107,7 +107,8 @@ const ( pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '', status VARCHAR(32) NOT NULL, state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish', - source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + tikv_source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + tiflash_source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, tikv_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, tiflash_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, PRIMARY KEY (task_id) diff --git a/br/pkg/lightning/importer/meta_manager.go b/br/pkg/lightning/importer/meta_manager.go index 88aa2f8c5f7c1..777f9b40b2de8 100644 --- a/br/pkg/lightning/importer/meta_manager.go +++ b/br/pkg/lightning/importer/meta_manager.go @@ -597,7 +597,7 @@ func (m *dbTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSou Logger: log.FromContext(ctx), } // avoid override existing metadata if the meta is already inserted. - stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, tikv_source_bytes, tiflash_source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName) + stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, tikv_source_bytes, tiflash_source_bytes) values (?, ?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName) err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), tikvSourceSize, tiflashSourceSize, taskStateNormal) return errors.Trace(err) } @@ -687,7 +687,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t } for _, task := range newTasks { // nolint:gosec - query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName) + query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", m.tableName) if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.tikvSourceBytes, task.tiflashSourceBytes, task.tikvAvail, task.tiflashAvail); err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/importer/meta_manager_test.go b/br/pkg/lightning/importer/meta_manager_test.go index ef50ee3a8f86b..631ecc9b09741 100644 --- a/br/pkg/lightning/importer/meta_manager_test.go +++ b/br/pkg/lightning/importer/meta_manager_test.go @@ -389,10 +389,10 @@ func TestCheckTasksExclusively(t *testing.T) { AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0"). AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0")) - s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)\\E"). + s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?, ?, ?)\\E"). WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(2048), uint64(0), uint64(0)). WillReturnResult(sqlmock.NewResult(0, 1)) - s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)\\E"). + s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?, ?, ?)\\E"). WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(3072), uint64(0), uint64(0)). WillReturnResult(sqlmock.NewResult(0, 1)) s.mockDB.ExpectCommit()