Skip to content

Commit

Permalink
[Ingest Manager] Agent atomic installer (elastic#21745)
Browse files Browse the repository at this point in the history
[Ingest Manager] Agent atomic installer (elastic#21745)
  • Loading branch information
michalpristas committed Oct 14, 2020
1 parent 45592d8 commit f82723d
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package atomic

import (
"context"
"io/ioutil"
"os"
"path/filepath"
)

type embeddedInstaller interface {
Install(ctx context.Context, programName, version, installDir string) error
}

// Installer installs into temporary destination and moves to correct one after
// successful finish.
type Installer struct {
installer embeddedInstaller
}

// NewInstaller creates a new AtomicInstaller
func NewInstaller(i embeddedInstaller) (*Installer, error) {
return &Installer{
installer: i,
}, nil
}

// Install performs installation of program in a specific version.
func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error {
// tar installer uses Dir of installDir to determine location of unpack
tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install")
if err != nil {
return err
}
tempInstallDir := filepath.Join(tempDir, filepath.Base(installDir))

// cleanup install directory before Install
if _, err := os.Stat(installDir); err == nil || os.IsExist(err) {
os.RemoveAll(installDir)
}

if _, err := os.Stat(tempInstallDir); err == nil || os.IsExist(err) {
os.RemoveAll(tempInstallDir)
}

if err := i.installer.Install(ctx, programName, version, tempInstallDir); err != nil {
// cleanup unfinished install
os.RemoveAll(tempInstallDir)
return err
}

if err := os.Rename(tempInstallDir, installDir); err != nil {
os.RemoveAll(installDir)
os.RemoveAll(tempInstallDir)
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package atomic

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestOKInstall(t *testing.T) {
sig := make(chan int)
ti := &testInstaller{sig}
var wg sync.WaitGroup
i, err := NewInstaller(ti)

assert.NoError(t, err)

ctx := context.Background()
installDir := filepath.Join(os.TempDir(), "install_dir")

wg.Add(1)
go func() {
err := i.Install(ctx, "a", "b", installDir)
assert.NoError(t, err)
wg.Done()
}()

// signal to process next files
close(sig)

wg.Wait()

assert.DirExists(t, installDir)
files := getFiles()

for name := range files {
path := filepath.Join(installDir, name)
assert.FileExists(t, path)
}

os.RemoveAll(installDir)
}

func TestContextCancelledInstall(t *testing.T) {
sig := make(chan int)
ti := &testInstaller{sig}
var wg sync.WaitGroup
i, err := NewInstaller(ti)

assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
installDir := filepath.Join(os.TempDir(), "install_dir")

wg.Add(1)
go func() {
err := i.Install(ctx, "a", "b", installDir)
assert.Error(t, err)
wg.Done()
}()

// cancel before signaling
cancel()
close(sig)

wg.Wait()

assert.NoDirExists(t, installDir)
}

type testInstaller struct {
signal chan int
}

func (ti *testInstaller) Install(ctx context.Context, programName, version, installDir string) error {
files := getFiles()
if err := os.MkdirAll(installDir, 0777); err != nil {
return err
}

for name, content := range files {
if err := ctx.Err(); err != nil {
return err
}

filename := filepath.Join(installDir, name)
if err := ioutil.WriteFile(filename, content, 0666); err != nil {
return err
}

// wait for all but last
<-ti.signal
}

return nil
}

func getFiles() map[string][]byte {
files := make(map[string][]byte)
fileCount := 3
for i := 1; i <= fileCount; i++ {
files[fmt.Sprintf("file_%d", i)] = []byte(fmt.Sprintf("content of file %d", i))
}

return files
}
8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/artifact/install/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/dir"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/atomic"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/hooks"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/tar"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install/zip"
Expand Down Expand Up @@ -60,5 +61,10 @@ func NewInstaller(config *artifact.Config) (InstallerChecker, error) {
return nil, err
}

return hooks.NewInstallerChecker(installer, dir.NewChecker())
atomicInstaller, err := atomic.NewInstaller(installer)
if err != nil {
return nil, err
}

return hooks.NewInstallerChecker(atomicInstaller, dir.NewChecker())
}

0 comments on commit f82723d

Please sign in to comment.