Skip to content

Commit

Permalink
[Ingest Manager] Support for upgrade rollback (elastic#22537)
Browse files Browse the repository at this point in the history
[Ingest Manager] Support for upgrade rollback (elastic#22537)
  • Loading branch information
michalpristas authored Dec 9, 2020
1 parent 7cbb1a2 commit 374ef1f
Show file tree
Hide file tree
Showing 20 changed files with 1,384 additions and 59 deletions.
4 changes: 1 addition & 3 deletions x-pack/elastic-agent/pkg/agent/application/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/gofrs/flock"
)

const lockFileName = "agent.lock"

// ErrAppAlreadyRunning error returned when another elastic-agent is already holding the lock.
var ErrAppAlreadyRunning = fmt.Errorf("another elastic-agent is already running")

Expand All @@ -23,7 +21,7 @@ type AppLocker struct {
}

// NewAppLocker creates an AppLocker that locks the agent.lock file inside the provided directory.
func NewAppLocker(dir string) *AppLocker {
func NewAppLocker(dir, lockFileName string) *AppLocker {
if _, err := os.Stat(dir); os.IsNotExist(err) {
_ = os.Mkdir(dir, 0755)
}
Expand Down
6 changes: 4 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/stretchr/testify/require"
)

const testLockFile = "test.lock"

func TestAppLocker(t *testing.T) {
tmp, _ := ioutil.TempDir("", "locker")
defer os.RemoveAll(tmp)

locker1 := NewAppLocker(tmp)
locker2 := NewAppLocker(tmp)
locker1 := NewAppLocker(tmp, testLockFile)
locker2 := NewAppLocker(tmp, testLockFile)

require.NoError(t, locker1.TryLock())
assert.Error(t, locker2.TryLock())
Expand Down
126 changes: 126 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/crash_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package upgrade

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

const (
defaultCheckPeriod = 10 * time.Second
evaluatedPeriods = 6 // with 10s period this means we evaluate 60s of agent run
crashesAllowed = 2 // means that within 60s one restart is allowed, additional one is considered crash
)

type serviceHandler interface {
PID(ctx context.Context) (int, error)
Name() string
Close()
}

// CrashChecker checks agent for crash pattern in Elastic Agent lifecycle.
type CrashChecker struct {
notifyChan chan error
q *disctintQueue
log *logger.Logger
sc serviceHandler
checkPeriod time.Duration
}

// NewCrashChecker creates a new crash checker.
func NewCrashChecker(ctx context.Context, ch chan error, log *logger.Logger) (*CrashChecker, error) {
q, err := newDistinctQueue(evaluatedPeriods)
if err != nil {
return nil, err
}

c := &CrashChecker{
notifyChan: ch,
q: q,
log: log,
checkPeriod: defaultCheckPeriod,
}

if err := c.Init(ctx, log); err != nil {
return nil, err
}

log.Debugf("running checks using '%s' controller", c.sc.Name())

return c, nil
}

// Run runs the checking loop.
func (ch *CrashChecker) Run(ctx context.Context) {
defer ch.sc.Close()

ch.log.Debug("Crash checker started")
for {
ch.log.Debugf("watcher having PID: %d", os.Getpid())
select {
case <-ctx.Done():
return
case <-time.After(ch.checkPeriod):
pid, err := ch.sc.PID(ctx)
if err != nil {
ch.log.Error(err)
}

ch.q.Push(pid)
restarts := ch.q.Distinct()
ch.log.Debugf("retrieved service PID [%d] changed %d times within %d", pid, restarts, evaluatedPeriods)
if restarts > crashesAllowed {
ch.notifyChan <- errors.New(fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkPeriod.Seconds()))
}
}
}
}

type disctintQueue struct {
q []int
size int
lock sync.Mutex
}

func newDistinctQueue(size int) (*disctintQueue, error) {
if size < 1 {
return nil, errors.New("invalid size", errors.TypeUnexpected)
}
return &disctintQueue{
q: make([]int, 0, size),
size: size,
}, nil
}

func (dq *disctintQueue) Push(id int) {
dq.lock.Lock()
defer dq.lock.Unlock()

cutIdx := len(dq.q)
if dq.size-1 < len(dq.q) {
cutIdx = dq.size - 1
}
dq.q = append([]int{id}, dq.q[:cutIdx]...)
}

func (dq *disctintQueue) Distinct() int {
dq.lock.Lock()
defer dq.lock.Unlock()

dm := make(map[int]int)

for _, id := range dq.q {
dm[id] = 1
}

return len(dm)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package upgrade

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

var (
testCheckPeriod = 100 * time.Millisecond
)

func TestChecker(t *testing.T) {
t.Run("no failure when no change", func(t *testing.T) {
pider := &testPider{}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
ch.Run(ctx)
}()

wg.Wait()
<-time.After(6 * testCheckPeriod)

var err error
select {
case err = <-errChan:
default:
}

canc()
require.NoError(t, err)
})

t.Run("no failure when unfrequent change", func(t *testing.T) {
pider := &testPider{}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
ch.Run(ctx)
}()

wg.Wait()
for i := 0; i < 2; i++ {
<-time.After(3 * testCheckPeriod)
pider.Change(i)
}
var err error
select {
case err = <-errChan:
default:
}

canc()
require.NoError(t, err)
})

t.Run("no failure when change lower than limit", func(t *testing.T) {
pider := &testPider{}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
ch.Run(ctx)
}()

wg.Wait()
for i := 0; i < 3; i++ {
<-time.After(7 * testCheckPeriod)
pider.Change(i)
}
var err error
select {
case err = <-errChan:
default:
}

canc()
require.NoError(t, err)
})

t.Run("fails when pid changes frequently", func(t *testing.T) {
pider := &testPider{}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
ch.Run(ctx)
}()

wg.Wait()
for i := 0; i < 12; i++ {
<-time.After(testCheckPeriod / 2)
pider.Change(i)
}
var err error
select {
case err = <-errChan:
default:
}

canc()
require.Error(t, err)
})
}

func testableChecker(t *testing.T, pider *testPider) (*CrashChecker, chan error) {
errChan := make(chan error, 1)
l, _ := logger.New("")
ch, err := NewCrashChecker(context.Background(), errChan, l)
require.NoError(t, err)

ch.checkPeriod = testCheckPeriod
ch.sc.Close()
ch.sc = pider

return ch, errChan
}

type testPider struct {
sync.Mutex
pid int
}

func (p *testPider) Change(pid int) {
p.Lock()
defer p.Unlock()
p.pid = pid
}

func (p *testPider) PID(ctx context.Context) (int, error) {
p.Lock()
defer p.Unlock()
return p.pid, nil
}

func (p *testPider) Close() {}

func (p *testPider) Name() string { return "testPider" }
Loading

0 comments on commit 374ef1f

Please sign in to comment.