Skip to content

Commit

Permalink
return lock instead of lockID in TrySync
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Aug 27, 2020
1 parent 6a09c31 commit 2d3c3ae
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
18 changes: 9 additions & 9 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,15 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.

// handleLock handles a single shard DDL lock.
func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error {
lockID, newDDLs, err := o.lk.TrySync(info, tts)
l, newDDLs, err := o.lk.TrySync(info, tts)
var cfStage = optimism.ConflictNone
if err != nil {
cfStage = optimism.ConflictDetected // we treat any errors returned from `TrySync` as conflict detected now.
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected",
zap.String("lock", lockID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err))
zap.String("lock", l.ID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err))
} else {
o.logger.Info("the shard DDL lock returned some DDLs",
zap.String("lock", lockID), zap.Strings("ddls", newDDLs), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted))
zap.String("lock", l.ID), zap.Strings("ddls", newDDLs), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted))

// try to record the init schema before applied the DDL to the downstream.
initSchema := optimism.NewInitSchema(info.Task, info.DownSchema, info.DownTable, info.TableInfoBefore)
Expand All @@ -498,11 +498,11 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
}
}

lock := o.lk.FindLock(lockID)
lock := o.lk.FindLock(l.ID)
if lock == nil {
// this should not happen.
o.logger.Warn("lock not found after try sync for shard DDL info", zap.String("lock", lockID), zap.Stringer("info", info))
return nil
// the lock was remove by others, revert it back
o.logger.Info("lock not found after try sync for shard DDL info, revert it back", zap.String("lock", l.ID), zap.Stringer("info", info))
lock = l
}

// check whether the lock has resolved.
Expand All @@ -516,12 +516,12 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
return nil
}

op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false)
op := optimism.NewOperation(lock.ID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false)
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op)
if err != nil {
return err
}
o.logger.Info("put shard DDL lock operation", zap.String("lock", lockID),
o.logger.Info("put shard DDL lock operation", zap.String("lock", lock.ID),
zap.Stringer("operation", op), zap.Bool("already exist", !succ), zap.Int64("revision", rev))
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewLockKeeper() *LockKeeper {
}

// TrySync tries to sync the lock.
func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, error) {
func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (*Lock, []string, error) {
var (
lockID = genDDLLockID(info)
l *Lock
Expand All @@ -51,7 +51,7 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e
}

newDDLs, err := l.TrySync(info.Source, info.UpSchema, info.UpTable, info.DDLs, info.TableInfoAfter, tts)
return lockID, newDDLs, err
return l, newDDLs, err
}

// RemoveLock removes a lock.
Expand Down
28 changes: 21 additions & 7 deletions pkg/shardddl/optimism/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ func (t *testKeeper) TestLockKeeper(c *C) {
)

// lock with 2 sources.
lockID1, newDDLs, err := lk.TrySync(i11, tts1)
l1, newDDLs, err := lk.TrySync(i11, tts1)
c.Assert(err, IsNil)
c.Assert(l1, NotNil)
lockID1 := l1.ID
c.Assert(lockID1, Equals, "task1-`foo`.`bar`")
c.Assert(newDDLs, DeepEquals, DDLs)
lock1 := lk.FindLock(lockID1)
Expand All @@ -68,8 +70,10 @@ func (t *testKeeper) TestLockKeeper(c *C) {
c.Assert(synced, IsFalse)
c.Assert(remain, Equals, 1)

lockID1, newDDLs, err = lk.TrySync(i12, tts1)
l1, newDDLs, err = lk.TrySync(i12, tts1)
c.Assert(err, IsNil)
c.Assert(l1, NotNil)
lockID1 = l1.ID
c.Assert(lockID1, Equals, "task1-`foo`.`bar`")
c.Assert(newDDLs, DeepEquals, DDLs)
lock1 = lk.FindLock(lockID1)
Expand All @@ -80,8 +84,10 @@ func (t *testKeeper) TestLockKeeper(c *C) {
c.Assert(remain, Equals, 0)

// lock with only 1 source.
lockID2, newDDLs, err := lk.TrySync(i21, tts2)
l2, newDDLs, err := lk.TrySync(i21, tts2)
c.Assert(err, IsNil)
c.Assert(l2, NotNil)
lockID2 := l2.ID
c.Assert(lockID2, Equals, "task2-`foo`.`bar`")
c.Assert(newDDLs, DeepEquals, DDLs)
lock2 := lk.FindLock(lockID2)
Expand Down Expand Up @@ -149,14 +155,18 @@ func (t *testKeeper) TestLockKeeperMultipleTarget(c *C) {
)

// lock for target1.
lockID1, newDDLs, err := lk.TrySync(i11, tts1)
l1, newDDLs, err := lk.TrySync(i11, tts1)
c.Assert(err, IsNil)
c.Assert(l1, NotNil)
lockID1 := l1.ID
c.Assert(lockID1, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`bar`")
c.Assert(newDDLs, DeepEquals, DDLs)

// lock for target2.
lockID2, newDDLs, err := lk.TrySync(i21, tts2)
l2, newDDLs, err := lk.TrySync(i21, tts2)
c.Assert(err, IsNil)
c.Assert(l2, NotNil)
lockID2 := l2.ID
c.Assert(lockID2, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`rab`")
c.Assert(newDDLs, DeepEquals, DDLs)

Expand All @@ -177,12 +187,16 @@ func (t *testKeeper) TestLockKeeperMultipleTarget(c *C) {
c.Assert(remain, Equals, 1)

// sync for two locks.
lockID1, newDDLs, err = lk.TrySync(i12, tts1)
l1, newDDLs, err = lk.TrySync(i12, tts1)
c.Assert(err, IsNil)
c.Assert(l1, NotNil)
lockID1 = l1.ID
c.Assert(lockID1, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`bar`")
c.Assert(newDDLs, DeepEquals, DDLs)
lockID2, newDDLs, err = lk.TrySync(i22, tts2)
l2, newDDLs, err = lk.TrySync(i22, tts2)
c.Assert(err, IsNil)
c.Assert(l2, NotNil)
lockID2 = l2.ID
c.Assert(lockID2, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`rab`")
c.Assert(newDDLs, DeepEquals, DDLs)

Expand Down

0 comments on commit 2d3c3ae

Please sign in to comment.