Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17.12] [manager/dispatcher] Synchronize Dispatcher.Stop() with incoming rpcs. #2514

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) {
a.config.SessionTracker.SessionError(err)
}

log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed")
}

if err := session.close(); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)

if err != nil {
s.errs <- err
return s
}

log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer())

s.conn = cc

go s.run(sessionCtx, delay, description)
Expand All @@ -77,6 +81,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI

func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
timer := time.NewTimer(delay) // delay before registering.
log.G(ctx).Infof("waiting %v before registering session", delay)
defer timer.Stop()
select {
case <-timer.C:
Expand Down Expand Up @@ -166,22 +171,32 @@ func (s *session) heartbeat(ctx context.Context) error {
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()

fields := logrus.Fields{
"sessionID": s.sessionID,
"method": "(*session).heartbeat",
}

for {
select {
case <-heartbeat.C:
heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
// TODO(anshul) log manager info in all logs in this function.
log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout)
resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
SessionID: s.sessionID,
})
cancel()
if err != nil {
log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer())
if grpc.Code(err) == codes.NotFound {
err = errNodeNotRegistered
}

return err
}

log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period)

heartbeat.Reset(resp.Period)
case <-s.closed:
return errSessionClosed
Expand Down Expand Up @@ -408,7 +423,7 @@ func (s *session) sendError(err error) {
}
}

// close closing session. It should be called only in <-session.errs branch
// close the given session. It should be called only in <-session.errs branch
// of event loop, or when cleaning up the agent.
func (s *session) close() error {
s.closeOnce.Do(func() {
Expand Down
6 changes: 6 additions & 0 deletions connectionbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
// connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -98,6 +99,11 @@ type Conn struct {
peer api.Peer
}

// Peer returns the peer for this Conn.
func (c *Conn) Peer() api.Peer {
return c.peer
}

// Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use,
Expand Down
Loading