From fff5f6a2241e4c9c9513c98c7e821f0a5007d362 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 13 Oct 2020 22:35:02 +0200 Subject: [PATCH] [Ingest Manager] Agent atomic installer (#21745) [Ingest Manager] Agent atomic installer (#21745) --- .../install/atomic/atomic_installer.go | 62 ++++++++++ .../install/atomic/atomic_installer_test.go | 115 ++++++++++++++++++ .../pkg/artifact/install/installer.go | 8 +- 3 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go create mode 100644 x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go new file mode 100644 index 00000000000..5e26436bfc4 --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package atomic + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" +) + +type embeddedInstaller interface { + Install(ctx context.Context, programName, version, installDir string) error +} + +// Installer installs into temporary destination and moves to correct one after +// successful finish. +type Installer struct { + installer embeddedInstaller +} + +// NewInstaller creates a new AtomicInstaller +func NewInstaller(i embeddedInstaller) (*Installer, error) { + return &Installer{ + installer: i, + }, nil +} + +// Install performs installation of program in a specific version. +func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error { + // tar installer uses Dir of installDir to determine location of unpack + tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install") + if err != nil { + return err + } + tempInstallDir := filepath.Join(tempDir, filepath.Base(installDir)) + + // cleanup install directory before Install + if _, err := os.Stat(installDir); err == nil || os.IsExist(err) { + os.RemoveAll(installDir) + } + + if _, err := os.Stat(tempInstallDir); err == nil || os.IsExist(err) { + os.RemoveAll(tempInstallDir) + } + + if err := i.installer.Install(ctx, programName, version, tempInstallDir); err != nil { + // cleanup unfinished install + os.RemoveAll(tempInstallDir) + return err + } + + if err := os.Rename(tempInstallDir, installDir); err != nil { + os.RemoveAll(installDir) + os.RemoveAll(tempInstallDir) + return err + } + + return nil +} diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go new file mode 100644 index 00000000000..d6266659b7d --- /dev/null +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go @@ -0,0 +1,115 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package atomic + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOKInstall(t *testing.T) { + sig := make(chan int) + ti := &testInstaller{sig} + var wg sync.WaitGroup + i, err := NewInstaller(ti) + + assert.NoError(t, err) + + ctx := context.Background() + installDir := filepath.Join(os.TempDir(), "install_dir") + + wg.Add(1) + go func() { + err := i.Install(ctx, "a", "b", installDir) + assert.NoError(t, err) + wg.Done() + }() + + // signal to process next files + close(sig) + + wg.Wait() + + assert.DirExists(t, installDir) + files := getFiles() + + for name := range files { + path := filepath.Join(installDir, name) + assert.FileExists(t, path) + } + + os.RemoveAll(installDir) +} + +func TestContextCancelledInstall(t *testing.T) { + sig := make(chan int) + ti := &testInstaller{sig} + var wg sync.WaitGroup + i, err := NewInstaller(ti) + + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + installDir := filepath.Join(os.TempDir(), "install_dir") + + wg.Add(1) + go func() { + err := i.Install(ctx, "a", "b", installDir) + assert.Error(t, err) + wg.Done() + }() + + // cancel before signaling + cancel() + close(sig) + + wg.Wait() + + assert.NoDirExists(t, installDir) +} + +type testInstaller struct { + signal chan int +} + +func (ti *testInstaller) Install(ctx context.Context, programName, version, installDir string) error { + files := getFiles() + if err := os.MkdirAll(installDir, 0777); err != nil { + return err + } + + for name, content := range files { + if err := ctx.Err(); err != nil { + return err + } + + filename := filepath.Join(installDir, name) + if err := ioutil.WriteFile(filename, content, 0666); err != nil { + return err + } + + // wait for all but last + <-ti.signal + } + + return nil +} + +func getFiles() map[string][]byte { + files := make(map[string][]byte) + fileCount := 3 + for i := 1; i <= fileCount; i++ { + files[fmt.Sprintf("file_%d", i)] = []byte(fmt.Sprintf("content of file %d", i)) + } + + return files +} diff --git a/x-pack/elastic-agent/pkg/artifact/install/installer.go b/x-pack/elastic-agent/pkg/artifact/install/installer.go index f04e7a4238e..c606ada5d65 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/installer.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/zip" @@ -60,5 +61,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) { return nil, err } - return hooks.NewInstallerChecker(installer, dir.NewChecker()) + atomicInstaller, err := atomic.NewInstaller(installer) + if err != nil { + return nil, err + } + + return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker()) }