Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl/ingest: set minCommitTS when detect remote duplicate keys #55588

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type litBackendCtx struct {
updateInterval time.Duration
checkpointMgr *CheckpointManager
etcdClient *clientv3.Client
initTS uint64

// unregisterMu prevents concurrent calls of `FinishAndUnregisterEngines`.
// For details, see https://github.com/pingcap/tidb/issues/53843.
Expand Down Expand Up @@ -149,9 +150,10 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Table) error {
dupeController := bc.backend.GetDupeController(bc.cfg.WorkerConcurrency, nil)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
MinCommitTS: bc.initTS,
}, lightning.ErrorOnDup)
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
Expand Down Expand Up @@ -153,7 +154,13 @@ func (m *litBackendCtxMgr) Register(
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
physical, logical, err := bd.GetTS(ctx)
if err != nil {
return nil, err
}
curTS := oracle.ComposeTS(physical, logical)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient, curTS)
m.backends.m[jobID] = bcCtx
m.memRoot.Consume(structSizeBackendCtx)
m.backends.mu.Unlock()
Expand Down Expand Up @@ -205,6 +212,7 @@ func newBackendContext(
memRoot MemRoot,
diskRoot DiskRoot,
etcdClient *clientv3.Client,
initTS uint64,
) *litBackendCtx {
bCtx := &litBackendCtx{
engines: make(map[int64]*engineInfo, 10),
Expand All @@ -217,6 +225,7 @@ func newBackendContext(
sysVars: vars,
updateInterval: checkpointUpdateInterval,
etcdClient: etcdClient,
initTS: initTS,
}
bCtx.timeOfLastFlush.Store(time.Now())
return bCtx
Expand Down
4 changes: 3 additions & 1 deletion pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ type SessionOptions struct {
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
// IndexID is used by the dupeDetector. Only the key range with the specified index ID is scanned.
IndexID int64
// MinCommitTS is used by dupeDetector. Only records that larger than commit TS are considered.
MinCommitTS uint64
}

// Rows represents a collection of encoded rows.
Expand Down
15 changes: 10 additions & 5 deletions pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func getDupDetectClient(
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand All @@ -330,9 +331,10 @@ func getDupDetectClient(
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),
}
req := &import_sstpb.DuplicateDetectRequest{
Context: reqCtx,
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
Context: reqCtx,
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
MinCommitTs: minCommitTS,
}
cli, err := importClient.DuplicateDetect(ctx, req)
if err != nil {
Expand All @@ -349,9 +351,10 @@ func NewRemoteDupKVStream(
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
) (*RemoteDupKVStream, error) {
subCtx, cancel := context.WithCancel(ctx)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType, minCommitTS)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down Expand Up @@ -422,6 +425,7 @@ type dupeDetector struct {
indexID int64
resourceGroupName string
taskType string
minCommitTS uint64
}

// NewDupeDetector creates a new dupeDetector.
Expand Down Expand Up @@ -456,6 +460,7 @@ func NewDupeDetector(
indexID: sessOpts.IndexID,
resourceGroupName: resourceGroupName,
taskType: taskType,
minCommitTS: sessOpts.MinCommitTS,
}, nil
}

Expand Down Expand Up @@ -937,7 +942,7 @@ func (m *dupeDetector) processRemoteDupTaskOnce(
logutil.Key("dupDetectEndKey", kr.EndKey),
)
err := func() error {
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType, m.minCommitTS)
if err != nil {
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
Expand Down
24 changes: 24 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -286,3 +287,26 @@ func TestAddUKErrorMessage(t *testing.T) {
err := tk.ExecToErr("alter table t add unique index uk(b);")
require.ErrorContains(t, err, "Duplicate entry '1' for key 't.uk'")
}

func TestAddUniqueIndexAfterFlashBack(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("set @@global.tidb_gc_life_time = '100m';")
defer tk.MustExec("set @@global.tidb_gc_life_time = default;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")
time.Sleep(1 * time.Second)
tk.MustExec("begin;")
currentTime := tk.MustQuery("select @@tidb_current_ts;").Rows()[0][0].(string)
tk.MustExec("rollback;")
time.Sleep(1 * time.Second) // Fixme: flashback cluster use timestamp instead of tso to check if there is a DDL job.
tk.MustExec("alter table t add index idx(b);")
tk.MustExec(fmt.Sprintf("flashback cluster to tso %s", currentTime))
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
}
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
Expand Down