Skip to content

Commit

Permalink
store: update kvrpc.Cleanup proto and change its behaviour pingcap#12212
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Sep 26, 2019
1 parent 37ad9b6 commit 1c83f1b
Show file tree
Hide file tree
Showing 27 changed files with 1,544 additions and 1,546 deletions.
12 changes: 6 additions & 6 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, nil
}

if val, ok := failpoint.Eval(_curpkg_("errorBeforeDecodeArgs")); ok {
failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
return ver, errors.New("occur an error before decode args")
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
}
})

tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job)
if err != nil {
Expand Down Expand Up @@ -375,13 +375,13 @@ func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.Colu
}
}

if val, ok := failpoint.Eval(_curpkg_("uninitializedOffsetAndState")); ok {
failpoint.Inject("uninitializedOffsetAndState", func(val failpoint.Value) {
if val.(bool) {
if newCol.State != model.StatePublic {
return ver, errors.New("the column state is wrong")
failpoint.Return(ver, errors.New("the column state is wrong"))
}
}
}
})

if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) && !mysql.HasPreventNullInsertFlag(oldCol.Flag) {
// Introduce the `mysql.HasPreventNullInsertFlag` flag to prevent users from inserting or updating null values.
Expand Down
586 changes: 293 additions & 293 deletions ddl/ddl.go

Large diffs are not rendered by default.

63 changes: 31 additions & 32 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,50 +39,50 @@ import (

var (
// RunWorker indicates if this TiDB server starts DDL worker and can run DDL job.
RunWorker = true
RunWorker = true
// ddlWorkerID is used for generating the next DDL worker ID.
ddlWorkerID = int32(0)
ddlWorkerID = int32(0)
// WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccured = 1 * time.Second
WaitTimeWhenErrorOccured = 1 * time.Second
)

type workerType byte

const (
// generalWorker is the worker who handles all DDL statements except “add index”.
generalWorker workerType = 0
generalWorker workerType = 0
// addIdxWorker is the worker who handles the operation of adding indexes.
addIdxWorker workerType = 1
addIdxWorker workerType = 1
// waitDependencyJobInterval is the interval when the dependency job doesn't be done.
waitDependencyJobInterval = 200 * time.Millisecond
waitDependencyJobInterval = 200 * time.Millisecond
// noneDependencyJob means a job has no dependency-job.
noneDependencyJob = 0
noneDependencyJob = 0
)

// worker is used for handling DDL jobs.
// Now we have two kinds of workers.
type worker struct {
id int32
tp workerType
ddlJobCh chan struct{}
quitCh chan struct{}
wg sync.WaitGroup

sessPool *sessionPool // sessPool is used to new sessions to execute SQL in ddl package.
reorgCtx *reorgCtx // reorgCtx is used for reorganization.
delRangeManager delRangeManager
logCtx context.Context
id int32
tp workerType
ddlJobCh chan struct{}
quitCh chan struct{}
wg sync.WaitGroup

sessPool *sessionPool // sessPool is used to new sessions to execute SQL in ddl package.
reorgCtx *reorgCtx // reorgCtx is used for reorganization.
delRangeManager delRangeManager
logCtx context.Context
}

func newWorker(tp workerType, store kv.Storage, sessPool *sessionPool, delRangeMgr delRangeManager) *worker {
worker := &worker{
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
quitCh: make(chan struct{}),
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
delRangeManager: delRangeMgr,
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
quitCh: make(chan struct{}),
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
delRangeManager: delRangeMgr,
}

worker.logCtx = logutil.WithKeyValue(context.Background(), "worker", worker.String())
Expand Down Expand Up @@ -359,9 +359,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}

var (
job *model.Job
schemaVer int64
runJobErr error
job *model.Job
schemaVer int64
runJobErr error
)
waitTime := 2 * d.lease
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
Expand Down Expand Up @@ -480,9 +480,8 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {

// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
failpoint.
// Mock for run ddl job panic.
Eval(_curpkg_("mockPanicInRunDDLJob"))
failpoint.Inject("mockPanicInRunDDLJob", func(_ failpoint.Value) {})

logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
timeStart := time.Now()
Expand Down Expand Up @@ -690,9 +689,9 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
return 0, errors.Trace(err)
}
diff := &model.SchemaDiff{
Version: schemaVersion,
Type: job.Type,
SchemaID: job.SchemaID,
Version: schemaVersion,
Type: job.Type,
SchemaID: job.SchemaID,
}
if job.Type == model.ActionTruncateTable {
// Truncate table has two table ID, should be handled differently.
Expand Down
Loading

0 comments on commit 1c83f1b

Please sign in to comment.