Skip to content

Commit

Permalink
Add mutex for updatingClient flag in client (#570)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanKurek authored and Corbin Phelps committed Jul 29, 2022
1 parent 49abd54 commit 924125c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
24 changes: 19 additions & 5 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"net/url"
"sync"

"github.com/observiq/observiq-otel-collector/collector"
"github.com/observiq/observiq-otel-collector/internal/version"
Expand Down Expand Up @@ -51,6 +52,7 @@ type Client struct {
downloadableFileManager opamp.DownloadableFileManager
collector collector.Collector
packagesStateProvider types.PackagesStateProvider
mutex sync.Mutex
updatingPackage bool

currentConfig opamp.Config
Expand Down Expand Up @@ -305,7 +307,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package

// Don't respond to PackagesAvailable messages while currently installing. We use this in memory data rather than the
// PackageStatuses persistant data in order to ensure that we don't get stuck in a stuck state
if c.updatingPackage {
if c.safeGetUpdatingPackage() {
curPackageStatuses.ErrorMessage = "Already installing new packages"
if err := c.opampClient.SetPackageStatuses(curPackageStatuses); err != nil {
c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err))
Expand Down Expand Up @@ -342,7 +344,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package
// Start update if applicable
collectorDownloadableFile := curPackageFiles[mainPackageName]
if collectorDownloadableFile != nil {
c.updatingPackage = true
c.safeSetUpdatingPackage(true)
go c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses)
}

Expand Down Expand Up @@ -429,7 +431,7 @@ func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPac
c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err))
}

c.updatingPackage = false
c.safeSetUpdatingPackage(false)
return
}

Expand All @@ -445,7 +447,7 @@ func (c *Client) onGetEffectiveConfigHandler(_ context.Context) (*protobufs.Effe

// attemptFailedInstall sets PackageStatuses status to failed and error message if we are in the middle of an install.
// This should allow the updater to pick this up and start the rollback process
func (c Client) attemptFailedInstall(errMsg string) {
func (c *Client) attemptFailedInstall(errMsg string) {
// See if we can retrieve the PackageStatuses where the main package is in an installing state
lastPackageStatuses := c.getMainPackageInstallingLastStatuses()
if lastPackageStatuses == nil {
Expand All @@ -461,7 +463,7 @@ func (c Client) attemptFailedInstall(errMsg string) {
}
}

func (c Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuses {
func (c *Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuses {
lastPackageStatuses, err := c.packagesStateProvider.LastReportedStatuses()
if err != nil {
c.logger.Error("Failed to retrieve last reported package statuses", zap.Error(err))
Expand All @@ -483,3 +485,15 @@ func (c Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuse

return lastPackageStatuses
}

func (c *Client) safeSetUpdatingPackage(value bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.updatingPackage = value
}

func (c *Client) safeGetUpdatingPackage() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.updatingPackage
}
18 changes: 11 additions & 7 deletions opamp/observiq/observiq_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

colmocks "github.com/observiq/observiq-otel-collector/collector/mocks"
"github.com/observiq/observiq-otel-collector/internal/version"
Expand Down Expand Up @@ -1191,8 +1192,13 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
}

err := c.onPackagesAvailableHandler(packagesAvailableNew)
wg.Wait()
assert.NoError(t, err)
wg.Wait()
// This is a half baked check. Ideally we'd like to check that this is true after the spun up goroutine is finished
// but there is no way to guarantee that without modifying code. So instead we'll do a partial check and just ensure
// that it was true at some point after the main function finishes (this will change anyways once updater monitoring
// is added)
assert.True(t, c.safeGetUpdatingPackage())
},
},
{
Expand Down Expand Up @@ -1289,16 +1295,13 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
Packages: packagesNew,
}

wg := sync.WaitGroup{}
wg.Add(1)
mockFileManager := mocks.NewMockDownloadableFileManager(t)
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr).Run(func(args mock.Arguments) {
wg.Done()
})
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr)
mockProvider := mocks.NewMockPackagesStateProvider(t)
mockProvider.On("LastReportedStatuses").Return(packageStatuses, nil)
mockProvider.On("SetLastReportedStatuses", mock.Anything).Return(nil)
mockOpAmpClient := mocks.NewMockOpAMPClient(t)
// This is for the initial status that is sent in the main function.
mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
status := args.Get(0).(*protobufs.PackageStatuses)

Expand All @@ -1314,6 +1317,7 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
assert.Equal(t, packageStatuses.Packages[collectorPackageName].AgentHasHash, status.Packages[collectorPackageName].AgentHasHash)
assert.Equal(t, packageStatuses.Packages[collectorPackageName].AgentHasVersion, status.Packages[collectorPackageName].AgentHasVersion)
})
// This will be called within the goroutine that is spun up from the main function.
mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
status := args.Get(0).(*protobufs.PackageStatuses)
assert.NotNil(t, status)
Expand All @@ -1337,8 +1341,8 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
}

err := c.onPackagesAvailableHandler(packagesAvailableNew)
wg.Wait()
assert.NoError(t, err)
assert.Eventually(t, func() bool { return c.safeGetUpdatingPackage() == false }, 10*time.Second, 10*time.Millisecond)
},
},
{
Expand Down

0 comments on commit 924125c

Please sign in to comment.