Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Feb 10, 2020
1 parent e581f14 commit 3ae58b6
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
11 changes: 6 additions & 5 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ func (p *Pessimist) Start(pCtx context.Context, etcdCli *clientv3.Client) error
p.lk.Clear() // clear all previous locks to support re-Start.

// get the history shard DDL info.
ifm, rev, err := pessimism.GetAllInfo(etcdCli)
// for the sequence of coordinate a shard DDL lock, see `/pkg/shardddl/pessimism/doc.go`.
ifm, rev1, err := pessimism.GetAllInfo(etcdCli)
if err != nil {
return err
}
p.logger.Info("get history shard DDL info", zap.Reflect("info", ifm), zap.Int64("revision", rev))
p.logger.Info("get history shard DDL info", zap.Reflect("info", ifm), zap.Int64("revision", rev1))

// get the history shard DDL lock operation.
// the newly operations after this GET will be received through the WATCH with `rev`,
// the newly operations after this GET will be received through the WATCH with `rev2`,
// and call `Lock.MarkDone` multiple times is fine.
opm, rev2, err := pessimism.GetAllOperations(etcdCli)
if err != nil {
Expand All @@ -92,7 +93,7 @@ func (p *Pessimist) Start(pCtx context.Context, etcdCli *clientv3.Client) error
p.wg.Done()
close(infoCh)
}()
pessimism.WatchInfoPut(ctx, etcdCli, rev, infoCh)
pessimism.WatchInfoPut(ctx, etcdCli, rev1, infoCh)
}()
go func() {
defer p.wg.Done()
Expand All @@ -107,7 +108,7 @@ func (p *Pessimist) Start(pCtx context.Context, etcdCli *clientv3.Client) error
p.wg.Done()
close(opCh)
}()
pessimism.WatchOperationPut(ctx, etcdCli, "", "", rev, opCh)
pessimism.WatchOperationPut(ctx, etcdCli, "", "", rev2, opCh)
}()
go func() {
defer p.wg.Done()
Expand Down
6 changes: 5 additions & 1 deletion pkg/shardddl/pessimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, out
log.L().Error("fail to construct shard DDL info from json", zap.ByteString("json", ev.Kv.Value))
continue
}
outCh <- info
select {
case outCh <- info:
case <-ctx.Done():
return
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,11 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source s
log.L().Error("fail to construct shard DDL operation from json", zap.ByteString("json", ev.Kv.Value))
continue
}
outCh <- op
select {
case outCh <- op:
case <-ctx.Done():
return
}
}
}
}
Expand Down

0 comments on commit 3ae58b6

Please sign in to comment.