From b63bcddfba1d5cc6b1b10a8a8a03804155da6103 Mon Sep 17 00:00:00 2001 From: Stefan Kurek Date: Tue, 19 Jul 2022 13:25:15 -0400 Subject: [PATCH] Add mutex for updatingClient flag in client (#570) --- opamp/observiq/observiq_client.go | 24 +++++++++++++++++++----- opamp/observiq/observiq_client_test.go | 18 +++++++++++------- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/opamp/observiq/observiq_client.go b/opamp/observiq/observiq_client.go index 63e44f89b..763f8239d 100644 --- a/opamp/observiq/observiq_client.go +++ b/opamp/observiq/observiq_client.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "net/url" + "sync" "github.com/observiq/observiq-otel-collector/collector" "github.com/observiq/observiq-otel-collector/internal/version" @@ -51,6 +52,7 @@ type Client struct { downloadableFileManager opamp.DownloadableFileManager collector collector.Collector packagesStateProvider types.PackagesStateProvider + mutex sync.Mutex updatingPackage bool currentConfig opamp.Config @@ -305,7 +307,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package // Don't respond to PackagesAvailable messages while currently installing. We use this in memory data rather than the // PackageStatuses persistant data in order to ensure that we don't get stuck in a stuck state - if c.updatingPackage { + if c.safeGetUpdatingPackage() { curPackageStatuses.ErrorMessage = "Already installing new packages" if err := c.opampClient.SetPackageStatuses(curPackageStatuses); err != nil { c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err)) @@ -342,7 +344,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package // Start update if applicable collectorDownloadableFile := curPackageFiles[mainPackageName] if collectorDownloadableFile != nil { - c.updatingPackage = true + c.safeSetUpdatingPackage(true) go c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses) } @@ -429,7 +431,7 @@ func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPac c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err)) } - c.updatingPackage = false + c.safeSetUpdatingPackage(false) return } @@ -445,7 +447,7 @@ func (c *Client) onGetEffectiveConfigHandler(_ context.Context) (*protobufs.Effe // attemptFailedInstall sets PackageStatuses status to failed and error message if we are in the middle of an install. // This should allow the updater to pick this up and start the rollback process -func (c Client) attemptFailedInstall(errMsg string) { +func (c *Client) attemptFailedInstall(errMsg string) { // See if we can retrieve the PackageStatuses where the main package is in an installing state lastPackageStatuses := c.getMainPackageInstallingLastStatuses() if lastPackageStatuses == nil { @@ -461,7 +463,7 @@ func (c Client) attemptFailedInstall(errMsg string) { } } -func (c Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuses { +func (c *Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuses { lastPackageStatuses, err := c.packagesStateProvider.LastReportedStatuses() if err != nil { c.logger.Error("Failed to retrieve last reported package statuses", zap.Error(err)) @@ -483,3 +485,15 @@ func (c Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuse return lastPackageStatuses } + +func (c *Client) safeSetUpdatingPackage(value bool) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.updatingPackage = value +} + +func (c *Client) safeGetUpdatingPackage() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.updatingPackage +} diff --git a/opamp/observiq/observiq_client_test.go b/opamp/observiq/observiq_client_test.go index 58ce98863..96ad9394e 100644 --- a/opamp/observiq/observiq_client_test.go +++ b/opamp/observiq/observiq_client_test.go @@ -23,6 +23,7 @@ import ( "path/filepath" "sync" "testing" + "time" colmocks "github.com/observiq/observiq-otel-collector/collector/mocks" "github.com/observiq/observiq-otel-collector/internal/version" @@ -1191,8 +1192,13 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) { } err := c.onPackagesAvailableHandler(packagesAvailableNew) - wg.Wait() assert.NoError(t, err) + wg.Wait() + // This is a half baked check. Ideally we'd like to check that this is true after the spun up goroutine is finished + // but there is no way to guarantee that without modifying code. So instead we'll do a partial check and just ensure + // that it was true at some point after the main function finishes (this will change anyways once updater monitoring + // is added) + assert.True(t, c.safeGetUpdatingPackage()) }, }, { @@ -1289,16 +1295,13 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) { Packages: packagesNew, } - wg := sync.WaitGroup{} - wg.Add(1) mockFileManager := mocks.NewMockDownloadableFileManager(t) - mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr).Run(func(args mock.Arguments) { - wg.Done() - }) + mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr) mockProvider := mocks.NewMockPackagesStateProvider(t) mockProvider.On("LastReportedStatuses").Return(packageStatuses, nil) mockProvider.On("SetLastReportedStatuses", mock.Anything).Return(nil) mockOpAmpClient := mocks.NewMockOpAMPClient(t) + // This is for the initial status that is sent in the main function. mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) { status := args.Get(0).(*protobufs.PackageStatuses) @@ -1314,6 +1317,7 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) { assert.Equal(t, packageStatuses.Packages[collectorPackageName].AgentHasHash, status.Packages[collectorPackageName].AgentHasHash) assert.Equal(t, packageStatuses.Packages[collectorPackageName].AgentHasVersion, status.Packages[collectorPackageName].AgentHasVersion) }) + // This will be called within the goroutine that is spun up from the main function. mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Run(func(args mock.Arguments) { status := args.Get(0).(*protobufs.PackageStatuses) assert.NotNil(t, status) @@ -1337,8 +1341,8 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) { } err := c.onPackagesAvailableHandler(packagesAvailableNew) - wg.Wait() assert.NoError(t, err) + assert.Eventually(t, func() bool { return c.safeGetUpdatingPackage() == false }, 10*time.Second, 10*time.Millisecond) }, }, {