Skip to content

Commit

Permalink
Moved package install function to goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanKurek authored and Corbin Phelps committed Jul 29, 2022
1 parent d122896 commit 49abd54
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
16 changes: 7 additions & 9 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package
collectorDownloadableFile := curPackageFiles[mainPackageName]
if collectorDownloadableFile != nil {
c.updatingPackage = true
if err := c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses); err != nil {
c.updatingPackage = false
return err
}
// TODO: When we set this to false will change once we use a new goroutine to start installation
c.updatingPackage = false
go c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses)
}

return nil
Expand Down Expand Up @@ -417,7 +412,9 @@ func (c *Client) createPackageMaps(
return pkgStatusMap, pkgFileMap
}

func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPackageStatuses *protobufs.PackageStatuses) error {
// installPackageFromFile tries to download and extract the given tarball and then start up the new Updater binary that was
// inside of it
func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPackageStatuses *protobufs.PackageStatuses) {
if fileManagerErr := c.downloadableFileManager.FetchAndExtractArchive(file); fileManagerErr != nil {
// Change existing status to show that install failed and get ready to send
curPackageStatuses.Packages[mainPackageName].Status = protobufs.PackageStatus_InstallFailed
Expand All @@ -432,12 +429,13 @@ func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPac
c.logger.Error("OpAMP client failed to set package statuses", zap.Error(err))
}

return fmt.Errorf("failed download, verify, & extract package's downloadable file: %w", fileManagerErr)
c.updatingPackage = false
return
}

// TODO: Start Updater

return nil
return
}

func (c *Client) onGetEffectiveConfigHandler(_ context.Context) (*protobufs.EffectiveConfig, error) {
Expand Down
18 changes: 14 additions & 4 deletions opamp/observiq/observiq_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"
"testing"

colmocks "github.com/observiq/observiq-otel-collector/collector/mocks"
Expand Down Expand Up @@ -1159,8 +1160,12 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
mockProvider := mocks.NewMockPackagesStateProvider(t)
mockProvider.On("LastReportedStatuses").Return(packageStatuses, nil)
mockProvider.On("SetLastReportedStatuses", mock.Anything).Return(nil)
wg := sync.WaitGroup{}
wg.Add(1)
mockFileManager := mocks.NewMockDownloadableFileManager(t)
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(nil)
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
wg.Done()
})
mockOpAmpClient := mocks.NewMockOpAMPClient(t)
mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
status := args.Get(0).(*protobufs.PackageStatuses)
Expand All @@ -1186,6 +1191,7 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
}

err := c.onPackagesAvailableHandler(packagesAvailableNew)
wg.Wait()
assert.NoError(t, err)
},
},
Expand Down Expand Up @@ -1283,8 +1289,12 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
Packages: packagesNew,
}

wg := sync.WaitGroup{}
wg.Add(1)
mockFileManager := mocks.NewMockDownloadableFileManager(t)
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr)
mockFileManager.On("FetchAndExtractArchive", mock.Anything).Return(expectedErr).Run(func(args mock.Arguments) {
wg.Done()
})
mockProvider := mocks.NewMockPackagesStateProvider(t)
mockProvider.On("LastReportedStatuses").Return(packageStatuses, nil)
mockProvider.On("SetLastReportedStatuses", mock.Anything).Return(nil)
Expand All @@ -1306,7 +1316,6 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
})
mockOpAmpClient.On("SetPackageStatuses", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
status := args.Get(0).(*protobufs.PackageStatuses)

assert.NotNil(t, status)
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, packagesAvailableNew.AllPackagesHash, status.ServerProvidedAllPackagesHash)
Expand All @@ -1328,7 +1337,8 @@ func TestClient_onPackagesAvailableHandler(t *testing.T) {
}

err := c.onPackagesAvailableHandler(packagesAvailableNew)
assert.ErrorIs(t, err, expectedErr)
wg.Wait()
assert.NoError(t, err)
},
},
{
Expand Down

0 comments on commit 49abd54

Please sign in to comment.