Skip to content

Commit

Permalink
ddl/ingest: set minCommitTS when detect remote duplicate keys (#55588)
Browse files Browse the repository at this point in the history
close #55587
  • Loading branch information
tangenta authored Sep 5, 2024
1 parent e9124dd commit c403cd5
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc)
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.RealStartTS,
)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,7 +1961,8 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
if indexInfo.Unique {
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1)
bc, err = ingest.LitBackCtxMgr.Register(
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type litBackendCtx struct {
updateInterval time.Duration
checkpointMgr *CheckpointManager
etcdClient *clientv3.Client
initTS uint64

// unregisterMu prevents concurrent calls of `FinishAndUnregisterEngines`.
// For details, see https://github.com/pingcap/tidb/issues/53843.
Expand Down Expand Up @@ -149,9 +150,10 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Table) error {
dupeController := bc.backend.GetDupeController(bc.cfg.WorkerConcurrency, nil)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
MinCommitTS: bc.initTS,
}, lightning.ErrorOnDup)
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BackendCtxMgr interface {
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
initTS uint64,
) (BackendCtx, error)
Unregister(jobID int64)
// EncodeJobSortPath encodes the job ID to the local disk sort path.
Expand Down Expand Up @@ -117,6 +118,7 @@ func (m *litBackendCtxMgr) Register(
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
initTS uint64,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if exist {
Expand Down Expand Up @@ -153,7 +155,7 @@ func (m *litBackendCtxMgr) Register(
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient, initTS)
m.backends.m[jobID] = bcCtx
m.memRoot.Consume(structSizeBackendCtx)
m.backends.mu.Unlock()
Expand Down Expand Up @@ -205,6 +207,7 @@ func newBackendContext(
memRoot MemRoot,
diskRoot DiskRoot,
etcdClient *clientv3.Client,
initTS uint64,
) *litBackendCtx {
bCtx := &litBackendCtx{
engines: make(map[int64]*engineInfo, 10),
Expand All @@ -217,6 +220,7 @@ func newBackendContext(
sysVars: vars,
updateInterval: checkpointUpdateInterval,
etcdClient: etcdClient,
initTS: initTS,
}
bCtx.timeOfLastFlush.Store(time.Now())
return bCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int) (BackendCtx, error) {
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ type SessionOptions struct {
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
// IndexID is used by the dupeDetector. Only the key range with the specified index ID is scanned.
IndexID int64
// MinCommitTS is used by dupeDetector. Only records that larger than commit TS are considered.
MinCommitTS uint64
}

// Rows represents a collection of encoded rows.
Expand Down
15 changes: 10 additions & 5 deletions pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func getDupDetectClient(
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand All @@ -330,9 +331,10 @@ func getDupDetectClient(
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),
}
req := &import_sstpb.DuplicateDetectRequest{
Context: reqCtx,
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
Context: reqCtx,
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
MinCommitTs: minCommitTS,
}
cli, err := importClient.DuplicateDetect(ctx, req)
if err != nil {
Expand All @@ -349,9 +351,10 @@ func NewRemoteDupKVStream(
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
) (*RemoteDupKVStream, error) {
subCtx, cancel := context.WithCancel(ctx)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType, minCommitTS)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down Expand Up @@ -422,6 +425,7 @@ type dupeDetector struct {
indexID int64
resourceGroupName string
taskType string
minCommitTS uint64
}

// NewDupeDetector creates a new dupeDetector.
Expand Down Expand Up @@ -456,6 +460,7 @@ func NewDupeDetector(
indexID: sessOpts.IndexID,
resourceGroupName: resourceGroupName,
taskType: taskType,
minCommitTS: sessOpts.MinCommitTS,
}, nil
}

Expand Down Expand Up @@ -937,7 +942,7 @@ func (m *dupeDetector) processRemoteDupTaskOnce(
logutil.Key("dupDetectEndKey", kr.EndKey),
)
err := func() error {
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType, m.minCommitTS)
if err != nil {
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest3/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) {
func TestBackendCtxConcurrentUnregister(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1)
bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0)
require.NoError(t, err)
idxIDs := []int64{1, 2, 3, 4, 5, 6, 7}
uniques := make([]bool, 0, len(idxIDs))
Expand Down

0 comments on commit c403cd5

Please sign in to comment.