Skip to content

Commit

Permalink
Update runtime_comm expected check-in handling to eliminate the lock …
Browse files Browse the repository at this point in the history
…in failure cases
  • Loading branch information
aleksmaus committed Sep 26, 2022
1 parent 2b82b29 commit d5f7de0
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions pkg/component/runtime/runtime_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ type runtimeComm struct {
token string
cert *authority.Pair

checkinConn bool
checkinDone chan bool
checkinLock sync.RWMutex
checkinExpected chan *proto.CheckinExpected
checkinConn bool
checkinDone chan bool
checkinLock sync.RWMutex

checkinExpectedLock sync.Mutex
checkinExpected chan *proto.CheckinExpected

checkinObserved chan *proto.CheckinObserved

actionsConn bool
Expand Down Expand Up @@ -83,7 +86,7 @@ func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.Cert
token: token.String(),
cert: pair,
checkinConn: true,
checkinExpected: make(chan *proto.CheckinExpected, 1),
checkinExpected: make(chan *proto.CheckinExpected, 1), // size of 1 channel to keep the latest expected checkin state
checkinObserved: make(chan *proto.CheckinObserved),
actionsConn: true,
actionsRequest: make(chan *proto.ActionRequest),
Expand Down Expand Up @@ -136,7 +139,20 @@ func (c *runtimeComm) CheckinExpected(expected *proto.CheckinExpected) {
} else {
expected.AgentInfo = nil
}

// Lock to avoid race if this function is called from the different go routines
c.checkinExpectedLock.Lock()

// Empty the channel
select {
case <-c.checkinExpected:
default:
}

// Put the new expected state in
c.checkinExpected <- expected

c.checkinExpectedLock.Unlock()
}

func (c *runtimeComm) CheckinObserved() <-chan *proto.CheckinObserved {
Expand Down Expand Up @@ -187,7 +203,7 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p
err := server.Send(expected)
if err != nil {
if reportableErr(err) {
c.logger.Debugf("check-in stream failed to send expected state: %s", err)
c.logger.Infof("check-in stream failed to send expected state: %s", err)
}
return
}
Expand All @@ -201,7 +217,7 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p
checkin, err := server.Recv()
if err != nil {
if reportableErr(err) {
c.logger.Debugf("check-in stream failed to receive data: %s", err)
c.logger.Infof("check-in stream failed to receive data: %s", err)
}
close(recvDone)
return
Expand Down

0 comments on commit d5f7de0

Please sign in to comment.