Skip to content

Commit

Permalink
syncer: fix lock in streamer controller (pingcap#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Feb 10, 2020
1 parent 256723e commit 7135428
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
4 changes: 3 additions & 1 deletion dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ func (t *testServer) TestTaskAutoResume(c *C) {

// check task will be auto resumed
c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool {
for _, st := range s.getWorker(true).QueryStatus(taskName) {
sts := s.getWorker(true).QueryStatus(taskName)
for _, st := range sts {
if st.Name == taskName && st.Stage == pb.Stage_Running {
return true
}
}
c.Log(sts)
return false
}), IsTrue)
}
29 changes: 20 additions & 9 deletions syncer/streamer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *StreamerController) Start(tctx *tcontext.Context, pos mysql.Position) e
err = c.updateServerIDAndResetReplication(tctx, pos)
}
if err != nil {
c.Close(tctx)
c.close(tctx)
return err
}

Expand Down Expand Up @@ -214,9 +214,6 @@ func (c *StreamerController) RedirectStreamer(tctx *tcontext.Context, pos mysql.

// GetEvent returns binlog event, should only have one thread call this function.
func (c *StreamerController) GetEvent(tctx *tcontext.Context) (event *replication.BinlogEvent, err error) {
c.Lock()
defer c.Unlock()

ctx, cancel := context.WithTimeout(tctx.Context(), eventTimeout)
failpoint.Inject("SyncerEventTimeout", func(val failpoint.Value) {
if seconds, ok := val.(int); ok {
Expand All @@ -226,11 +223,17 @@ func (c *StreamerController) GetEvent(tctx *tcontext.Context) (event *replicatio
}
})

event, err = c.streamer.GetEvent(ctx)
c.RLock()
streamer := c.streamer
c.RUnlock()

event, err = streamer.GetEvent(ctx)
cancel()
if err != nil {
if err != context.Canceled && err != context.DeadlineExceeded {
c.Lock()
c.meetError = true
c.Unlock()
}

return nil, err
Expand All @@ -240,13 +243,18 @@ func (c *StreamerController) GetEvent(tctx *tcontext.Context) (event *replicatio
case *replication.RotateEvent:
// if is local binlog, binlog's name contain uuid information, need to save it
// if is remote binlog, need to add uuid information in binlog's name
if !c.setUUIDIfExists(string(ev.NextLogName)) {
if len(c.uuidSuffix) != 0 {
c.Lock()
containUUID := c.setUUIDIfExists(string(ev.NextLogName))
uuidSuffix := c.uuidSuffix
c.Unlock()

if !containUUID {
if len(uuidSuffix) != 0 {
filename, err := binlog.ParseFilename(string(ev.NextLogName))
if err != nil {
return nil, terror.Annotate(err, "fail to parse binlog file name from rotate event")
}
ev.NextLogName = []byte(binlog.ConstructFilenameWithUUIDSuffix(filename, c.uuidSuffix))
ev.NextLogName = []byte(binlog.ConstructFilenameWithUUIDSuffix(filename, uuidSuffix))
event.Event = ev
}
}
Expand Down Expand Up @@ -299,8 +307,11 @@ func (c *StreamerController) closeBinlogSyncer(logtctx *tcontext.Context, binlog
// Close closes streamer
func (c *StreamerController) Close(tctx *tcontext.Context) {
c.Lock()
defer c.Unlock()
c.close(tctx)
c.Unlock()
}

func (c *StreamerController) close(tctx *tcontext.Context) {
if c.closed {
return
}
Expand Down

0 comments on commit 7135428

Please sign in to comment.