diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go b/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go new file mode 100644 index 00000000000..543d9955547 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package operation + +import ( + "context" + "fmt" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" +) + +// retryableOperations consists of multiple operations which are +// retryable as a whole. +// if nth operation fails all preceding are retried as well +type retryableOperations struct { + logger *logger.Logger + operations []operation + retryConfig *retry.Config +} + +func newRetryableOperations( + logger *logger.Logger, + retryConfig *retry.Config, + operations ...operation) *retryableOperations { + + return &retryableOperations{ + logger: logger, + retryConfig: retryConfig, + operations: operations, + } +} + +// Name is human readable name identifying an operation +func (o *retryableOperations) Name() string { + names := make([]string, 0, len(o.operations)) + for _, op := range o.operations { + names = append(names, op.Name()) + } + return fmt.Sprintf("retryable block: %s", strings.Join(names, " ")) +} + +// Check checks whether operation needs to be run +// examples: +// - Start does not need to run if process is running +// - Fetch does not need to run if package is already present +func (o *retryableOperations) Check(application Application) (bool, error) { + for _, op := range o.operations { + // finish early if at least one operation needs to be run or errored out + if run, err := op.Check(application); err != nil || run { + return run, err + } + } + + return false, nil +} + +// Run runs the operation +func (o *retryableOperations) Run(ctx context.Context, application Application) (err error) { + return retry.Do(ctx, o.retryConfig, o.runOnce(application)) +} + +// Run runs the operation +func (o *retryableOperations) runOnce(application Application) func(context.Context) error { + return func(ctx context.Context) error { + for _, op := range o.operations { + if ctx.Err() != nil { + return ctx.Err() + } + + shouldRun, err := op.Check(application) + if err != nil { + return err + } + + if !shouldRun { + continue + } + + o.logger.Debugf("running operation '%s' of the block '%s'", op.Name(), o.Name()) + if err := op.Run(ctx, application); err != nil { + o.logger.Errorf("operation %s failed", op.Name()) + return err + } + } + + return nil + } +} + +// check interface +var _ operation = &retryableOperations{} diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index eb80f05eb1c..7c7b376cc17 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -161,8 +161,12 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error { // specific configuration of new process is passed func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) { flow := []operation{ - newOperationFetch(o.logger, p, o.config, o.downloader, o.eventProcessor), - newOperationVerify(p, o.config, o.verifier, o.eventProcessor), + newRetryableOperations( + o.logger, + o.config.RetryConfig, + newOperationFetch(o.logger, p, o.config, o.downloader, o.eventProcessor), + newOperationVerify(p, o.config, o.verifier, o.eventProcessor), + ), newOperationInstall(o.logger, p, o.config, o.installer, o.eventProcessor), newOperationStart(o.logger, p, o.config, cfg, o.eventProcessor), newOperationConfig(o.logger, o.config, cfg, o.eventProcessor), diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/fs/downloader.go index 1f4c2183883..87740908069 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/fs/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/downloader.go @@ -39,21 +39,25 @@ func NewDownloader(config *artifact.Config) *Downloader { // Download fetches the package from configured source. // Returns absolute path to downloaded package and an error. -func (e *Downloader) Download(_ context.Context, programName, version string) (string, error) { - // create a destination directory root/program - destinationDir := filepath.Join(e.config.TargetDirectory, programName) - if err := os.MkdirAll(destinationDir, 0755); err != nil { - return "", errors.New(err, "creating directory for downloaded artifact failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, destinationDir)) - } +func (e *Downloader) Download(_ context.Context, programName, version string) (_ string, err error) { + downloadedFiles := make([]string, 0, 2) + defer func() { + if err != nil { + for _, path := range downloadedFiles { + os.Remove(path) + } + } + }() // download from source to dest path, err := e.download(e.config.OS(), programName, version) + downloadedFiles = append(downloadedFiles, path) if err != nil { - os.Remove(path) return "", err } - _, err = e.downloadHash(e.config.OS(), programName, version) + hashPath, err := e.downloadHash(e.config.OS(), programName, version) + downloadedFiles = append(downloadedFiles, hashPath) return path, err } @@ -103,6 +107,10 @@ func (e *Downloader) downloadFile(filename, fullPath string) (string, error) { defer destinationFile.Close() _, err = io.Copy(destinationFile, sourceFile) + if err != nil { + return "", err + } + return fullPath, nil } diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz b/x-pack/elastic-agent/pkg/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz new file mode 100644 index 00000000000..4ce7e4d9fd4 --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz @@ -0,0 +1,5 @@ +sample +content +of +a +file diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz.sha512 b/x-pack/elastic-agent/pkg/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz.sha512 new file mode 100644 index 00000000000..5d0fc9e405d --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/testdata/drop/beat-8.0.0-darwin-x86_64.tar.gz.sha512 @@ -0,0 +1 @@ +9af9aa016f3349aa248034629e4336ca2f4d31317bfb8c9a23a9d924c18969cf43ad93727e784da010a272690b2b5ce4c4ded3a5d2039e4408e93e1e18d113db beat-8.0.0-darwin-x86_64.tar.gz diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go index 85f1813c992..942a412efdf 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go @@ -53,7 +53,14 @@ func (v *Verifier) Verify(programName, version string) (bool, error) { fullPath := filepath.Join(v.config.TargetDirectory, filename) - return v.verifyHash(filename, fullPath) + isMatch, err := v.verifyHash(filename, fullPath) + if !isMatch || err != nil { + // remove bits so they can be redownloaded + os.Remove(fullPath) + os.Remove(fullPath + ".sha512") + } + + return isMatch, err } func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) { diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go index 5cf98fcc244..25c5df2efda 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier_test.go @@ -14,6 +14,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" ) @@ -28,6 +30,101 @@ type testCase struct { arch string } +func TestFetchVerify(t *testing.T) { + timeout := 15 * time.Second + dropPath := filepath.Join("testdata", "drop") + installPath := filepath.Join("testdata", "install") + targetPath := filepath.Join("testdata", "download") + ctx := context.Background() + programName := "beat" + version := "8.0.0" + + targetFilePath := filepath.Join(targetPath, "beat-8.0.0-darwin-x86_64.tar.gz") + hashTargetFilePath := filepath.Join(targetPath, "beat-8.0.0-darwin-x86_64.tar.gz.sha512") + + // cleanup + defer os.RemoveAll(targetPath) + + config := &artifact.Config{ + TargetDirectory: targetPath, + DropPath: dropPath, + InstallPath: installPath, + Timeout: timeout, + OperatingSystem: "darwin", + Architecture: "32", + } + + err := prepareFetchVerifyTests(dropPath, targetPath, targetFilePath, hashTargetFilePath) + assert.NoError(t, err) + + downloader := NewDownloader(config) + verifier, err := NewVerifier(config) + assert.NoError(t, err) + + // first download verify should fail: + // download skipped, as invalid package is prepared upfront + // verify fails and cleans download + matches, err := verifier.Verify(programName, version) + assert.NoError(t, err) + assert.Equal(t, false, matches) + + _, err = os.Stat(targetFilePath) + assert.True(t, os.IsNotExist(err)) + + _, err = os.Stat(hashTargetFilePath) + assert.True(t, os.IsNotExist(err)) + + // second one should pass + // download not skipped: package missing + // verify passes because hash is not correct + _, err = downloader.Download(ctx, programName, version) + assert.NoError(t, err) + + // file downloaded ok + _, err = os.Stat(targetFilePath) + assert.NoError(t, err) + + _, err = os.Stat(hashTargetFilePath) + assert.NoError(t, err) + + matches, err = verifier.Verify(programName, version) + assert.NoError(t, err) + assert.Equal(t, true, matches) +} + +func prepareFetchVerifyTests(dropPath, targetDir, targetFilePath, hashTargetFilePath string) error { + sourceFilePath := filepath.Join(dropPath, "beat-8.0.0-darwin-x86_64.tar.gz") + hashSourceFilePath := filepath.Join(dropPath, "beat-8.0.0-darwin-x86_64.tar.gz.sha512") + + // clean targets + os.Remove(targetFilePath) + os.Remove(hashTargetFilePath) + + if err := os.MkdirAll(targetDir, 0775); err != nil { + return err + } + + sourceFile, err := os.Open(sourceFilePath) + if err != nil { + return err + } + defer sourceFile.Close() + + targretFile, err := os.OpenFile(targetFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) + if err != nil { + return err + } + defer targretFile.Close() + + hashContent, err := ioutil.ReadFile(hashSourceFilePath) + if err != nil { + return err + } + + corruptedHash := append([]byte{1, 2, 3, 4, 5, 6}, hashContent[6:]...) + return ioutil.WriteFile(hashTargetFilePath, corruptedHash, 0666) +} + func TestVerify(t *testing.T) { targetDir, err := ioutil.TempDir(os.TempDir(), "") if err != nil { diff --git a/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go index e882a2194de..63d093a6647 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go @@ -51,15 +51,25 @@ func NewDownloaderWithClient(config *artifact.Config, client http.Client) *Downl // Download fetches the package from configured source. // Returns absolute path to downloaded package and an error. -func (e *Downloader) Download(ctx context.Context, programName, version string) (string, error) { +func (e *Downloader) Download(ctx context.Context, programName, version string) (_ string, err error) { + downloadedFiles := make([]string, 0, 2) + defer func() { + if err != nil { + for _, path := range downloadedFiles { + os.Remove(path) + } + } + }() + // download from source to dest path, err := e.download(ctx, e.config.OS(), programName, version) + downloadedFiles = append(downloadedFiles, path) if err != nil { - os.Remove(path) return "", err } - _, err = e.downloadHash(ctx, e.config.OS(), programName, version) + hashPath, err := e.downloadHash(ctx, e.config.OS(), programName, version) + downloadedFiles = append(downloadedFiles, hashPath) return path, err } diff --git a/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go b/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go index 4c08b5049e8..486f222c8e5 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go +++ b/x-pack/elastic-agent/pkg/artifact/download/http/verifier.go @@ -66,7 +66,14 @@ func (v *Verifier) Verify(programName, version string) (bool, error) { return false, errors.New(err, "retrieving package path") } - return v.verifyHash(filename, fullPath) + isMatch, err := v.verifyHash(filename, fullPath) + if !isMatch || err != nil { + // remove bits so they can be redownloaded + os.Remove(fullPath) + os.Remove(fullPath + ".sha512") + } + + return isMatch, err } func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) {