Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pump storage quit bug #739

Merged
merged 3 commits into from
Sep 5, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Server struct {
wg sync.WaitGroup
gcDuration time.Duration
triggerGC chan time.Time
pullClose chan struct{}
metrics *metricClient
// save the last time we write binlog to Storage
// if long time not write, we can write a fake binlog
Expand Down Expand Up @@ -168,6 +169,7 @@ func NewServer(cfg *Config) (*Server, error) {
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
pullClose: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -289,12 +291,14 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi
log.Errorf("drainer request a purged binlog (gc ts = %d), request %+v, some binlog events may be loss", gcTS, in)
}

ctx, cancel := context.WithCancel(s.ctx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
binlogs := s.storage.PullCommitBinlog(ctx, last)

for {
select {
case <-s.pullClose:
return nil
case data, ok := <-binlogs:
if !ok {
return nil
Expand Down Expand Up @@ -865,6 +869,8 @@ func (s *Server) Close() {
s.commitStatus()
log.Info("commit status done")

// PullBinlogs should be stopped after commitStatus
close(s.pullClose)
// stop the gRPC server
s.gs.GracefulStop()
log.Info("grpc is stopped")
Expand Down