Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duplicate call loginFunc (#3860) #3875

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,15 @@
// Users can still enable heartbeat feature by setting HeartbeatInterval to a positive value.
if ctl.sessionCtx.Common.Transport.HeartbeatInterval > 0 {
// send heartbeat to server
sendHeartBeat := func() error {
sendHeartBeat := func() (bool, error) {

Check failure on line 242 in client/control.go

View workflow job for this annotation

GitHub Actions / lint

(*Control).heartbeatWorker$1 - result 0 (bool) is always false (unparam)
xl.Debug("send heartbeat to server")
pingMsg := &msg.Ping{}
if err := ctl.sessionCtx.AuthSetter.SetPing(pingMsg); err != nil {
xl.Warn("error during ping authentication: %v, skip sending ping message", err)
return err
return false, err
}
_ = ctl.msgDispatcher.Send(pingMsg)
return nil
return false, nil
}

go wait.BackoffUntil(sendHeartBeat,
Expand Down
20 changes: 8 additions & 12 deletions client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,16 @@ func (svr *Service) keepControllerWorking() {
// the control immediately exits. It is necessary to limit the frequency of reconnection in this case.
// The interval for the first three retries in 1 minute will be very short, and then it will increase exponentially.
// The maximum interval is 20 seconds.
wait.BackoffUntil(func() error {
wait.BackoffUntil(func() (bool, error) {
// loopLoginUntilSuccess is another layer of loop that will continuously attempt to
// login to the server until successful.
svr.loopLoginUntilSuccess(20*time.Second, false)
if svr.ctl != nil {
<-svr.ctl.Done()
return errors.New("control is closed and try another loop")
return false, errors.New("control is closed and try another loop")
}
// If the control is nil, it means that the login failed and the service is also closed.
return nil
return false, nil
}, wait.NewFastBackoffManager(
wait.FastBackoffOptions{
Duration: time.Second,
Expand Down Expand Up @@ -282,17 +282,16 @@ func (svr *Service) login() (conn net.Conn, connector Connector, err error) {

func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginExit bool) {
xl := xlog.FromContextSafe(svr.ctx)
successCh := make(chan struct{})

loginFunc := func() error {
loginFunc := func() (bool, error) {
xl.Info("try to connect to server...")
conn, connector, err := svr.login()
if err != nil {
xl.Warn("connect to server error: %v", err)
if firstLoginExit {
svr.cancel(cancelErr{Err: err})
}
return err
return false, err
}

svr.cfgMu.RLock()
Expand All @@ -315,7 +314,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
if err != nil {
conn.Close()
xl.Error("NewControl error: %v", err)
return err
return false, err
}
ctl.SetInWorkConnCallback(svr.handleWorkConnCb)

Expand All @@ -328,8 +327,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
svr.ctl = ctl
svr.ctlMu.Unlock()

close(successCh)
return nil
return true, nil
}

// try to reconnect to server until success
Expand All @@ -339,9 +337,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE
Factor: 2,
Jitter: 0.1,
MaxDuration: maxInterval,
}),
true,
wait.MergeAndCloseOnAnyStopChannel(svr.ctx.Done(), successCh))
}), true, svr.ctx.Done())
}

func (svr *Service) UpdateAllConfigurer(proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) error {
Expand Down
10 changes: 6 additions & 4 deletions pkg/util/wait/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (f *fastBackoffImpl) Backoff(previousDuration time.Duration, previousCondit
return f.options.Duration
}

func BackoffUntil(f func() error, backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
func BackoffUntil(f func() (bool, error), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var delay time.Duration
previousError := false

Expand All @@ -131,7 +131,9 @@ func BackoffUntil(f func() error, backoff BackoffManager, sliding bool, stopCh <
delay = backoff.Backoff(delay, previousError)
}

if err := f(); err != nil {
if done, err := f(); done {
return
} else if err != nil {
previousError = true
} else {
previousError = false
Expand Down Expand Up @@ -170,9 +172,9 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
}

func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
ff := func() error {
ff := func() (bool, error) {
f()
return nil
return false, nil
}
BackoffUntil(ff, BackoffFunc(func(time.Duration, bool) time.Duration {
return period
Expand Down
Loading