From c403cd555d3abd93b15782306213213852bb2226 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 5 Sep 2024 23:28:18 +0800 Subject: [PATCH] ddl/ingest: set `minCommitTS` when detect remote duplicate keys (#55588) close pingcap/tidb#55587 --- pkg/ddl/backfilling.go | 2 +- pkg/ddl/backfilling_dist_executor.go | 1 + pkg/ddl/index.go | 3 ++- pkg/ddl/ingest/backend.go | 8 +++++--- pkg/ddl/ingest/backend_mgr.go | 6 +++++- pkg/ddl/ingest/mock.go | 2 +- pkg/lightning/backend/encode/encode.go | 4 +++- pkg/lightning/backend/local/duplicate.go | 15 ++++++++++----- .../realtikvtest/addindextest3/functional_test.go | 2 +- 9 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 338878be768a2..83c9047bb0fe2 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -687,7 +687,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) bcCtx, err := ingest.LitBackCtxMgr.Register( - ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc) + ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index bc37b650987c0..e2bbc86e18343 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -156,6 +156,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { discovery, job.ReorgMeta.ResourceGroupName, job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + job.RealStartTS, ) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 34e7861e4fed1..b466905f3ea59 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1961,7 +1961,8 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo if indexInfo.Unique { ctx := tidblogutil.WithCategory(ctx, "ddl-ingest") if bc == nil { - bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1) + bc, err = ingest.LitBackCtxMgr.Register( + ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS) if err != nil { return err } diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 7158c3fdda072..83e7dc668c765 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -102,6 +102,7 @@ type litBackendCtx struct { updateInterval time.Duration checkpointMgr *CheckpointManager etcdClient *clientv3.Client + initTS uint64 // unregisterMu prevents concurrent calls of `FinishAndUnregisterEngines`. // For details, see https://github.com/pingcap/tidb/issues/53843. @@ -149,9 +150,10 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Table) error { dupeController := bc.backend.GetDupeController(bc.cfg.WorkerConcurrency, nil) hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{ - SQLMode: mysql.ModeStrictAllTables, - SysVars: bc.sysVars, - IndexID: indexID, + SQLMode: mysql.ModeStrictAllTables, + SysVars: bc.sysVars, + IndexID: indexID, + MinCommitTS: bc.initTS, }, lightning.ErrorOnDup) return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe) } diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 74300de29869f..81c29617ae0c3 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -51,6 +51,7 @@ type BackendCtxMgr interface { pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, + initTS uint64, ) (BackendCtx, error) Unregister(jobID int64) // EncodeJobSortPath encodes the job ID to the local disk sort path. @@ -117,6 +118,7 @@ func (m *litBackendCtxMgr) Register( pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, concurrency int, + initTS uint64, ) (BackendCtx, error) { bc, exist := m.Load(jobID) if exist { @@ -153,7 +155,7 @@ func (m *litBackendCtxMgr) Register( return nil, err } - bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient) + bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient, initTS) m.backends.m[jobID] = bcCtx m.memRoot.Consume(structSizeBackendCtx) m.backends.mu.Unlock() @@ -205,6 +207,7 @@ func newBackendContext( memRoot MemRoot, diskRoot DiskRoot, etcdClient *clientv3.Client, + initTS uint64, ) *litBackendCtx { bCtx := &litBackendCtx{ engines: make(map[int64]*engineInfo, 10), @@ -217,6 +220,7 @@ func newBackendContext( sysVars: vars, updateInterval: checkpointUpdateInterval, etcdClient: etcdClient, + initTS: initTS, } bCtx.timeOfLastFlush.Store(time.Now()) return bCtx diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 8d5fe8744dccf..a176f5d5ce32c 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { // Register implements BackendCtxMgr.Register interface. func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, - pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int) (BackendCtx, error) { + pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) { logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/lightning/backend/encode/encode.go b/pkg/lightning/backend/encode/encode.go index 9f870bd15c5f7..73fe990032465 100644 --- a/pkg/lightning/backend/encode/encode.go +++ b/pkg/lightning/backend/encode/encode.go @@ -57,8 +57,10 @@ type SessionOptions struct { SysVars map[string]string // a seed used for tableKvEncoder's auto random bits value AutoRandomSeed int64 - // IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned. + // IndexID is used by the dupeDetector. Only the key range with the specified index ID is scanned. IndexID int64 + // MinCommitTS is used by dupeDetector. Only records that larger than commit TS are considered. + MinCommitTS uint64 } // Rows represents a collection of encoded rows. diff --git a/pkg/lightning/backend/local/duplicate.go b/pkg/lightning/backend/local/duplicate.go index 297c2b7163dd3..30cc97e2668e6 100644 --- a/pkg/lightning/backend/local/duplicate.go +++ b/pkg/lightning/backend/local/duplicate.go @@ -310,6 +310,7 @@ func getDupDetectClient( importClientFactory ImportClientFactory, resourceGroupName string, taskType string, + minCommitTS uint64, ) (import_sstpb.ImportSST_DuplicateDetectClient, error) { leader := region.Leader if leader == nil { @@ -330,9 +331,10 @@ func getDupDetectClient( RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType), } req := &import_sstpb.DuplicateDetectRequest{ - Context: reqCtx, - StartKey: keyRange.StartKey, - EndKey: keyRange.EndKey, + Context: reqCtx, + StartKey: keyRange.StartKey, + EndKey: keyRange.EndKey, + MinCommitTs: minCommitTS, } cli, err := importClient.DuplicateDetect(ctx, req) if err != nil { @@ -349,9 +351,10 @@ func NewRemoteDupKVStream( importClientFactory ImportClientFactory, resourceGroupName string, taskType string, + minCommitTS uint64, ) (*RemoteDupKVStream, error) { subCtx, cancel := context.WithCancel(ctx) - cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType) + cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType, minCommitTS) if err != nil { cancel() return nil, errors.Trace(err) @@ -422,6 +425,7 @@ type dupeDetector struct { indexID int64 resourceGroupName string taskType string + minCommitTS uint64 } // NewDupeDetector creates a new dupeDetector. @@ -456,6 +460,7 @@ func NewDupeDetector( indexID: sessOpts.IndexID, resourceGroupName: resourceGroupName, taskType: taskType, + minCommitTS: sessOpts.MinCommitTS, }, nil } @@ -937,7 +942,7 @@ func (m *dupeDetector) processRemoteDupTaskOnce( logutil.Key("dupDetectEndKey", kr.EndKey), ) err := func() error { - stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType) + stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType, m.minCommitTS) if err != nil { return errors.Annotatef(err, "failed to create remote duplicate kv stream") } diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index 0e9af59522793..51de47d165be4 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -84,7 +84,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) { func TestBackendCtxConcurrentUnregister(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1) + bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0) require.NoError(t, err) idxIDs := []int64{1, 2, 3, 4, 5, 6, 7} uniques := make([]bool, 0, len(idxIDs))