Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add mutex for updatingClient flag in client #570

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"net/url"
"sync"

"github.com/observiq/observiq-otel-collector/collector"
"github.com/observiq/observiq-otel-collector/internal/version"
Expand Down Expand Up @@ -51,6 +52,7 @@ type Client struct {
downloadableFileManager opamp.DownloadableFileManager
collector collector.Collector
packagesStateProvider types.PackagesStateProvider
mutex sync.Mutex
updatingPackage bool

currentConfig opamp.Config
Expand Down Expand Up @@ -305,7 +307,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package

// Don't respond to PackagesAvailable messages while currently installing. We use this in memory data rather than the
// PackageStatuses persistant data in order to ensure that we don't get stuck in a stuck state
if c.updatingPackage {
if c.safeGetUpdatingPackage() {
curPackageStatuses.ErrorMessage = "Already installing new packages"
if err := c.opampClient.SetPackageStatuses(curPackageStatuses); err != nil {
c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err))
Expand Down Expand Up @@ -342,7 +344,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package
// Start update if applicable
collectorDownloadableFile := curPackageFiles[mainPackageName]
if collectorDownloadableFile != nil {
c.updatingPackage = true
c.safeSetUpdatingPackage(true)
go c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses)
}

Expand Down Expand Up @@ -429,7 +431,7 @@ func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPac
c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err))
}

c.updatingPackage = false
c.safeSetUpdatingPackage(false)
return
}

Expand All @@ -445,7 +447,7 @@ func (c *Client) onGetEffectiveConfigHandler(_ context.Context) (*protobufs.Effe

// attemptFailedInstall sets PackageStatuses status to failed and error message if we are in the middle of an install.
// This should allow the updater to pick this up and start the rollback process
func (c Client) attemptFailedInstall(errMsg string) {
func (c *Client) attemptFailedInstall(errMsg string) {
// See if we can retrieve the PackageStatuses where the main package is in an installing state
lastPackageStatuses := c.getMainPackageInstallingLastStatuses()
if lastPackageStatuses == nil {
Expand All @@ -461,7 +463,7 @@ func (c Client) attemptFailedInstall(errMsg string) {
}
}

func (c Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuses {
func (c *Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuses {
lastPackageStatuses, err := c.packagesStateProvider.LastReportedStatuses()
if err != nil {
c.logger.Error("Failed to retrieve last reported package statuses", zap.Error(err))
Expand All @@ -483,3 +485,15 @@ func (c Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatuse

return lastPackageStatuses
}

func (c *Client) safeSetUpdatingPackage(value bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.updatingPackage = value
}

func (c *Client) safeGetUpdatingPackage() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.updatingPackage
}
18 changes: 11 additions & 7 deletions opamp/observiq/observiq_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

colmocks "github.com/observiq/observiq-otel-collector/collector/mocks"
"github.com/observiq/observiq-otel-collector/internal/version"
Expand Down Expand Up @@ -1191,8 +1192,13 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
}

err := c.onPackagesAvailableHandler(packagesAvailableNew)
wg.Wait()
assert.NoError(t, err)
wg.Wait()
// This is a half baked check. Ideally we'd like to check that this is true after the spun up goroutine is finished
// but there is no way to guarantee that without modifying code. So instead we'll do a partial check and just ensure
// that it was true at some point after the main function finishes (this will change anyways once updater monitoring
// is added)
assert.True(t, c.safeGetUpdatingPackage())
},
},
{
Expand Down Expand Up @@ -1289,16 +1295,13 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
Packages: packagesNew,
}

wg := sync.WaitGroup{}
wg.Add(1)
mockFileManager := mocks.NewMockDownloadableFileManager(t)
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr).Run(func(args mock.Arguments) {
wg.Done()
})
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr)
mockProvider := mocks.NewMockPackagesStateProvider(t)
mockProvider.On("LastReportedStatuses").Return(packageStatuses, nil)
mockProvider.On("SetLastReportedStatuses", mock.Anything).Return(nil)
mockOpAmpClient := mocks.NewMockOpAMPClient(t)
// This is for the initial status that is sent in the main function.
mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
status := args.Get(0).(*protobufs.PackageStatuses)

Expand All @@ -1314,6 +1317,7 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
assert.Equal(t, packageStatuses.Packages[collectorPackageName].AgentHasHash, status.Packages[collectorPackageName].AgentHasHash)
assert.Equal(t, packageStatuses.Packages[collectorPackageName].AgentHasVersion, status.Packages[collectorPackageName].AgentHasVersion)
})
// This will be called within the goroutine that is spun up from the main function.
mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
status := args.Get(0).(*protobufs.PackageStatuses)
assert.NotNil(t, status)
Expand All @@ -1337,8 +1341,8 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
}

err := c.onPackagesAvailableHandler(packagesAvailableNew)
wg.Wait()
assert.NoError(t, err)
assert.Eventually(t, func() bool { return c.safeGetUpdatingPackage() == false }, 10*time.Second, 10*time.Millisecond)
},
},
{
Expand Down