diff --git a/pkg/component/runtime/runtime_comm.go b/pkg/component/runtime/runtime_comm.go index f52e2bb124d..bb4343ed29b 100644 --- a/pkg/component/runtime/runtime_comm.go +++ b/pkg/component/runtime/runtime_comm.go @@ -48,10 +48,13 @@ type runtimeComm struct { token string cert *authority.Pair - checkinConn bool - checkinDone chan bool - checkinLock sync.RWMutex - checkinExpected chan *proto.CheckinExpected + checkinConn bool + checkinDone chan bool + checkinLock sync.RWMutex + + checkinExpectedLock sync.Mutex + checkinExpected chan *proto.CheckinExpected + checkinObserved chan *proto.CheckinObserved actionsConn bool @@ -83,7 +86,7 @@ func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.Cert token: token.String(), cert: pair, checkinConn: true, - checkinExpected: make(chan *proto.CheckinExpected, 1), + checkinExpected: make(chan *proto.CheckinExpected, 1), // size of 1 channel to keep the latest expected checkin state checkinObserved: make(chan *proto.CheckinObserved), actionsConn: true, actionsRequest: make(chan *proto.ActionRequest), @@ -136,7 +139,20 @@ func (c *runtimeComm) CheckinExpected(expected *proto.CheckinExpected) { } else { expected.AgentInfo = nil } + + // Lock to avoid race if this function is called from the different go routines + c.checkinExpectedLock.Lock() + + // Empty the channel + select { + case <-c.checkinExpected: + default: + } + + // Put the new expected state in c.checkinExpected <- expected + + c.checkinExpectedLock.Unlock() } func (c *runtimeComm) CheckinObserved() <-chan *proto.CheckinObserved { @@ -187,7 +203,7 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p err := server.Send(expected) if err != nil { if reportableErr(err) { - c.logger.Debugf("check-in stream failed to send expected state: %s", err) + c.logger.Infof("check-in stream failed to send expected state: %s", err) } return } @@ -201,7 +217,7 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p checkin, err := server.Recv() if err != nil { if reportableErr(err) { - c.logger.Debugf("check-in stream failed to receive data: %s", err) + c.logger.Infof("check-in stream failed to receive data: %s", err) } close(recvDone) return