Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl/ingest: set minCommitTS when detect remote duplicate keys #55588

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc)
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.RealStartTS,
)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,7 +1961,8 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
if indexInfo.Unique {
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1)
bc, err = ingest.LitBackCtxMgr.Register(
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type litBackendCtx struct {
updateInterval time.Duration
checkpointMgr *CheckpointManager
etcdClient *clientv3.Client
initTS uint64

// unregisterMu prevents concurrent calls of `FinishAndUnregisterEngines`.
// For details, see https://github.com/pingcap/tidb/issues/53843.
Expand Down Expand Up @@ -149,9 +150,10 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Table) error {
dupeController := bc.backend.GetDupeController(bc.cfg.WorkerConcurrency, nil)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
MinCommitTS: bc.initTS,
}, lightning.ErrorOnDup)
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BackendCtxMgr interface {
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
initTS uint64,
) (BackendCtx, error)
Unregister(jobID int64)
// EncodeJobSortPath encodes the job ID to the local disk sort path.
Expand Down Expand Up @@ -117,6 +118,7 @@ func (m *litBackendCtxMgr) Register(
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
initTS uint64,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if exist {
Expand Down Expand Up @@ -153,7 +155,7 @@ func (m *litBackendCtxMgr) Register(
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient, initTS)
m.backends.m[jobID] = bcCtx
m.memRoot.Consume(structSizeBackendCtx)
m.backends.mu.Unlock()
Expand Down Expand Up @@ -205,6 +207,7 @@ func newBackendContext(
memRoot MemRoot,
diskRoot DiskRoot,
etcdClient *clientv3.Client,
initTS uint64,
) *litBackendCtx {
bCtx := &litBackendCtx{
engines: make(map[int64]*engineInfo, 10),
Expand All @@ -217,6 +220,7 @@ func newBackendContext(
sysVars: vars,
updateInterval: checkpointUpdateInterval,
etcdClient: etcdClient,
initTS: initTS,
}
bCtx.timeOfLastFlush.Store(time.Now())
return bCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int) (BackendCtx, error) {
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ type SessionOptions struct {
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
// IndexID is used by the dupeDetector. Only the key range with the specified index ID is scanned.
IndexID int64
// MinCommitTS is used by dupeDetector. Only records that larger than commit TS are considered.
MinCommitTS uint64
}

// Rows represents a collection of encoded rows.
Expand Down
15 changes: 10 additions & 5 deletions pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func getDupDetectClient(
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand All @@ -330,9 +331,10 @@ func getDupDetectClient(
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),
}
req := &import_sstpb.DuplicateDetectRequest{
Context: reqCtx,
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
Context: reqCtx,
StartKey: keyRange.StartKey,
EndKey: keyRange.EndKey,
MinCommitTs: minCommitTS,
}
cli, err := importClient.DuplicateDetect(ctx, req)
if err != nil {
Expand All @@ -349,9 +351,10 @@ func NewRemoteDupKVStream(
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
) (*RemoteDupKVStream, error) {
subCtx, cancel := context.WithCancel(ctx)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType, minCommitTS)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down Expand Up @@ -422,6 +425,7 @@ type dupeDetector struct {
indexID int64
resourceGroupName string
taskType string
minCommitTS uint64
}

// NewDupeDetector creates a new dupeDetector.
Expand Down Expand Up @@ -456,6 +460,7 @@ func NewDupeDetector(
indexID: sessOpts.IndexID,
resourceGroupName: resourceGroupName,
taskType: taskType,
minCommitTS: sessOpts.MinCommitTS,
}, nil
}

Expand Down Expand Up @@ -937,7 +942,7 @@ func (m *dupeDetector) processRemoteDupTaskOnce(
logutil.Key("dupDetectEndKey", kr.EndKey),
)
err := func() error {
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType, m.minCommitTS)
if err != nil {
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest3/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) {
func TestBackendCtxConcurrentUnregister(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1)
bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0)
require.NoError(t, err)
idxIDs := []int64{1, 2, 3, 4, 5, 6, 7}
uniques := make([]bool, 0, len(idxIDs))
Expand Down