diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index eb98ef39ded..96f036dd15d 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -15,6 +15,7 @@ - Copy Action store on upgrade {pull}21298[21298] - Include inputs in action store actions {pull}21298[21298] - Fix issue where inputs without processors defined would panic {pull}21628[21628] +- Partial extracted beat result in failure to spawn beat {issue}21718[21718] ==== New features diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go new file mode 100644 index 00000000000..5e26436bfc4 --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package atomic + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" +) + +type embeddedInstaller interface { + Install(ctx context.Context, programName, version, installDir string) error +} + +// Installer installs into temporary destination and moves to correct one after +// successful finish. +type Installer struct { + installer embeddedInstaller +} + +// NewInstaller creates a new AtomicInstaller +func NewInstaller(i embeddedInstaller) (*Installer, error) { + return &Installer{ + installer: i, + }, nil +} + +// Install performs installation of program in a specific version. +func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error { + // tar installer uses Dir of installDir to determine location of unpack + tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install") + if err != nil { + return err + } + tempInstallDir := filepath.Join(tempDir, filepath.Base(installDir)) + + // cleanup install directory before Install + if _, err := os.Stat(installDir); err == nil || os.IsExist(err) { + os.RemoveAll(installDir) + } + + if _, err := os.Stat(tempInstallDir); err == nil || os.IsExist(err) { + os.RemoveAll(tempInstallDir) + } + + if err := i.installer.Install(ctx, programName, version, tempInstallDir); err != nil { + // cleanup unfinished install + os.RemoveAll(tempInstallDir) + return err + } + + if err := os.Rename(tempInstallDir, installDir); err != nil { + os.RemoveAll(installDir) + os.RemoveAll(tempInstallDir) + return err + } + + return nil +} diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go new file mode 100644 index 00000000000..d6266659b7d --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go @@ -0,0 +1,115 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package atomic + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOKInstall(t *testing.T) { + sig := make(chan int) + ti := &testInstaller{sig} + var wg sync.WaitGroup + i, err := NewInstaller(ti) + + assert.NoError(t, err) + + ctx := context.Background() + installDir := filepath.Join(os.TempDir(), "install_dir") + + wg.Add(1) + go func() { + err := i.Install(ctx, "a", "b", installDir) + assert.NoError(t, err) + wg.Done() + }() + + // signal to process next files + close(sig) + + wg.Wait() + + assert.DirExists(t, installDir) + files := getFiles() + + for name := range files { + path := filepath.Join(installDir, name) + assert.FileExists(t, path) + } + + os.RemoveAll(installDir) +} + +func TestContextCancelledInstall(t *testing.T) { + sig := make(chan int) + ti := &testInstaller{sig} + var wg sync.WaitGroup + i, err := NewInstaller(ti) + + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + installDir := filepath.Join(os.TempDir(), "install_dir") + + wg.Add(1) + go func() { + err := i.Install(ctx, "a", "b", installDir) + assert.Error(t, err) + wg.Done() + }() + + // cancel before signaling + cancel() + close(sig) + + wg.Wait() + + assert.NoDirExists(t, installDir) +} + +type testInstaller struct { + signal chan int +} + +func (ti *testInstaller) Install(ctx context.Context, programName, version, installDir string) error { + files := getFiles() + if err := os.MkdirAll(installDir, 0777); err != nil { + return err + } + + for name, content := range files { + if err := ctx.Err(); err != nil { + return err + } + + filename := filepath.Join(installDir, name) + if err := ioutil.WriteFile(filename, content, 0666); err != nil { + return err + } + + // wait for all but last + <-ti.signal + } + + return nil +} + +func getFiles() map[string][]byte { + files := make(map[string][]byte) + fileCount := 3 + for i := 1; i <= fileCount; i++ { + files[fmt.Sprintf("file_%d", i)] = []byte(fmt.Sprintf("content of file %d", i)) + } + + return files +} diff --git a/x-pack/elastic-agent/pkg/artifact/install/installer.go b/x-pack/elastic-agent/pkg/artifact/install/installer.go index f04e7a4238e..c606ada5d65 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/installer.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/zip" @@ -60,5 +61,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) { return nil, err } - return hooks.NewInstallerChecker(installer, dir.NewChecker()) + atomicInstaller, err := atomic.NewInstaller(installer) + if err != nil { + return nil, err + } + + return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker()) } diff --git a/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go b/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go index 5c7f0f593a3..74a74e4c6bc 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/tar/tar_installer.go @@ -99,9 +99,19 @@ func unpack(r io.Reader, dir string) error { } _, err = io.Copy(wf, tr) + + if err == nil { + // sometimes we try executing binary too fast and run into text file busy after unpacking + // syncing prevents this + if syncErr := wf.Sync(); syncErr != nil { + err = syncErr + } + } + if closeErr := wf.Close(); closeErr != nil && err == nil { err = closeErr } + if err != nil { return fmt.Errorf("TarInstaller: error writing to %s: %v", abs, err) } diff --git a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go index ffc90f2dce8..b565f630a73 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/zip/zip_installer.go @@ -102,14 +102,18 @@ func (i *Installer) unzip(artifactPath string) error { return err } defer func() { - if cerr := f.Close(); cerr != nil { - err = multierror.Append(err, cerr) + if closeErr := f.Close(); closeErr != nil { + err = multierror.Append(err, closeErr) } }() if _, err = io.Copy(f, rc); err != nil { return err } + + // sometimes we try executing binary too fast and run into text file busy after unpacking + // syncing prevents this + f.Sync() } return nil }