-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix pump storage quit bug #739
fix pump storage quit bug #739
Conversation
/run-all-tests |
why ci pass before, this bug seems exists before pr735...means can't offline pump |
Perhaps |
we cancel ctx before |
I have a new conjecture now. I will do some tests to confirm it and later I will give a conclusion. |
@july2993 select {
case values <- value:
log.Debug("send value success")
case <-ctx.Done():
iter.Release()
return
} Although, |
will block at guess because the flighting PullBinlog gRPC not quit |
If so, we have two solutions.
// stop the gRPC server
s.gs.GracefulStop()
log.Info("grpc is stopped")
if err := s.storage.Close(); err != nil {
log.Error("close storage failed", zap.Error(err))
} Code in // PullCommitBinlog return commit binlog > last
func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
log.Debug("new PullCommitBinlog", zap.Int64("last ts", last))
ctx, cancel := context.WithCancel(ctx)
go func() {
select {
case <-a.close:
cancel()
case <-ctx.Done():
}
}() After In 1. we may read |
for 1, inflight WriteBinlog() gRPC need to use s.Storage, so can't just close s.Storage first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What problem does this PR solve?
A son PR of #735.
When we try to close pump server, we will cancel
s.ctx
first. But we wish thatstorage
will pull storaged binlogs until drainer receives them. Then pump can safely quit. But if we uses.ctx
forstorage.PullCommitBinlog
it will quit at first and drainer may no longer receive binlog from this pump.What is changed and how it works?
Change
s.ctx
tocontext.Background()
and adds.pullClose
channel to control the closure ofPullBinlogs
. Runclose(s.pullClose)
aftercommitStatus
.Check List
Tests
Code changes
Side effects
Related changes