Skip to content

Commit

Permalink
ddl: fix owner get panic info with dist-reorg (#41217)
Browse files Browse the repository at this point in the history
close #41208
  • Loading branch information
zimulala authored Feb 10, 2023
1 parent 278a9fe commit caaf668
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 5 deletions.
13 changes: 11 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.El
rc.setCurrentElement(currElement)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
rc.mu.warningsCount = make(map[errors.ErrorID]int64)
rc.references.Add(1)
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
dc.reorgCtx.reorgCtxMap[jobID] = rc
Expand All @@ -545,13 +546,21 @@ func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) {
if rc == nil {
ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}
dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount)
} else {
rc.references.Add(1)
}
}

func (dc *ddlCtx) removeReorgCtx(job *model.Job) {
func (dc *ddlCtx) removeReorgCtx(jobID int64) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
delete(dc.reorgCtx.reorgCtxMap, job.ID)
ctx, ok := dc.reorgCtx.reorgCtxMap[jobID]
if ok {
ctx.references.Sub(1)
if ctx.references.Load() == 0 {
delete(dc.reorgCtx.reorgCtxMap, jobID)
}
}
}

func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
Expand Down
30 changes: 30 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ import (

const testLease = 5 * time.Millisecond

// DDLForTest exports for testing.
type DDLForTest interface {
// SetInterceptor sets the interceptor.
SetInterceptor(h Interceptor)
NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx
SetReorgCtxForBackfill(bfJob *BackfillJob)
GetReorgCtx(jobID int64) *reorgCtx
RemoveReorgCtx(id int64)
}

// SetInterceptor implements DDL.SetInterceptor interface.
Expand All @@ -52,6 +57,31 @@ func (d *ddl) SetInterceptor(i Interceptor) {
d.mu.interceptor = i
}

// IsReorgCanceled exports for testing.
func (rc *reorgCtx) IsReorgCanceled() bool {
return rc.isReorgCanceled()
}

// NewReorgCtx exports for testing.
func (d *ddl) NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx {
return d.newReorgCtx(jobID, startKey, currElement, rowCount)
}

// SetReorgCtxForBackfill exports for testing.
func (d *ddl) SetReorgCtxForBackfill(bfJob *BackfillJob) {
d.setReorgCtxForBackfill(bfJob)
}

// GetReorgCtx exports for testing.
func (d *ddl) GetReorgCtx(jobID int64) *reorgCtx {
return d.getReorgCtx(jobID)
}

// RemoveReorgCtx exports for testing.
func (d *ddl) RemoveReorgCtx(id int64) {
d.removeReorgCtx(id)
}

// JobNeedGCForTest is only used for test.
var JobNeedGCForTest = jobNeedGC

Expand Down
27 changes: 27 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,30 @@ func TestJobNeedGC(t *testing.T) {
}}}
require.True(t, ddl.JobNeedGCForTest(job))
}

func TestUsingReorgCtx(t *testing.T) {
_, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := domain.DDL()

wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Run(func() {
jobID := int64(1)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Wait()
}
1 change: 1 addition & 0 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (d *ddl) loadBackfillJobAndRun() {
defer func() {
tidbutil.Recover(metrics.LabelDistReorg, "runBackfillJobs", nil, false)
d.removeBackfillCtxJobCtx(bfJob.JobID)
d.removeReorgCtx(bfJob.JobID)
d.sessPool.put(se)
}()

Expand Down
9 changes: 6 additions & 3 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -76,6 +77,8 @@ type reorgCtx struct {
warnings map[errors.ErrorID]*terror.Error
warningsCount map[errors.ErrorID]int64
}

references atomicutil.Int32
}

// nullableKey can store <nil> kv.Key.
Expand Down Expand Up @@ -227,7 +230,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
case err := <-rc.doneCh:
// Since job is cancelled,we don't care about its partial counts.
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job)
d.removeReorgCtx(job.ID)
return dbterror.ErrCancelledDDLJob
}
rowCount := rc.getRowCount()
Expand All @@ -242,7 +245,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
// Update a job's warnings.
w.mergeWarningsIntoJob(job)

d.removeReorgCtx(job)
d.removeReorgCtx(job.ID)
// For other errors, even err is not nil here, we still wait the partial counts to be collected.
// since in the next round, the startKey is brand new which is stored by last time.
if err != nil {
Expand All @@ -252,7 +255,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
updateBackfillProgress(w, reorgInfo, tblInfo, 0)
case <-w.ctx.Done():
logutil.BgLogger().Info("[ddl] run reorg job quit")
d.removeReorgCtx(job)
d.removeReorgCtx(job.ID)
// We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break.
return dbterror.ErrWaitReorgTimeout
case <-time.After(waitTimeout):
Expand Down
2 changes: 2 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(

go_test(
name = "ttlworker_test",
timeout = "long",
srcs = [
"del_test.go",
"job_manager_integration_test.go",
Expand All @@ -56,6 +57,7 @@ go_test(
embed = [":ttlworker"],
flaky = True,
race = "on",
shard_count = 30,
deps = [
"//domain",
"//infoschema",
Expand Down

0 comments on commit caaf668

Please sign in to comment.