From 921681ce8853116fa44efbc04fe1c3f9b3226832 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 11:15:43 -0400 Subject: [PATCH 01/21] kill watcher on uninstall --- internal/pkg/agent/install/uninstall.go | 67 +++++++++++++++++++++---- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 5da0b60fac1..54d3cecc616 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -6,6 +6,7 @@ package install import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -15,9 +16,11 @@ import ( "github.com/kardianos/service" + "github.com/elastic/elastic-agent-system-metrics/metric/system/process" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" + aerrors "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/agent/vars" "github.com/elastic/elastic-agent/internal/pkg/capabilities" @@ -41,23 +44,28 @@ func Uninstall(cfgFile, topPath, uninstallToken string) error { if status == service.StatusRunning { err := svc.Stop() if err != nil { - return errors.New( + return aerrors.New( err, fmt.Sprintf("failed to stop service (%s)", paths.ServiceName), - errors.M("service", paths.ServiceName)) + aerrors.M("service", paths.ServiceName)) } } + // kill any running watcher + if err := killWatcher(); err != nil { + return fmt.Errorf("failed trying to kill any running watcher: %w", err) + } + // Uninstall components first if err := uninstallComponents(context.Background(), cfgFile, uninstallToken); err != nil { // If service status was running it was stopped to uninstall the components. // If the components uninstall failed start the service again if status == service.StatusRunning { if startErr := svc.Start(); startErr != nil { - return errors.New( + return aerrors.New( err, fmt.Sprintf("failed to restart service (%s), after failed components uninstall: %v", paths.ServiceName, startErr), - errors.M("service", paths.ServiceName)) + aerrors.M("service", paths.ServiceName)) } } return err @@ -70,20 +78,20 @@ func Uninstall(cfgFile, topPath, uninstallToken string) error { if paths.ShellWrapperPath != "" { err = os.Remove(paths.ShellWrapperPath) if !os.IsNotExist(err) && err != nil { - return errors.New( + return aerrors.New( err, fmt.Sprintf("failed to remove shell wrapper (%s)", paths.ShellWrapperPath), - errors.M("destination", paths.ShellWrapperPath)) + aerrors.M("destination", paths.ShellWrapperPath)) } } // remove existing directory err = RemovePath(topPath) if err != nil { - return errors.New( + return aerrors.New( err, fmt.Sprintf("failed to remove installation directory (%s)", paths.Top()), - errors.M("directory", paths.Top())) + aerrors.M("directory", paths.Top())) } return nil @@ -258,7 +266,7 @@ func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp compo func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Config) ([]component.Component, error) { mm, err := cfg.ToMapStr() if err != nil { - return nil, errors.New("failed to create a map from config", err) + return nil, aerrors.New("failed to create a map from config", err) } allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil) if err != nil { @@ -299,7 +307,7 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) } err = transpiler.Insert(ast, renderedInputs, "inputs") if err != nil { - return nil, errors.New("inserting rendered inputs failed", err) + return nil, aerrors.New("inserting rendered inputs failed", err) } } @@ -310,3 +318,40 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) return config.NewConfigFrom(finalConfig) } + +// killWatcher finds and kills any running Elastic Agent watcher. +func killWatcher() error { + procStats := process.Stats{ + Procs: []string{"elastic-agent"}, + CacheCmdLine: true, + } + err := procStats.Init() + if err != nil { + return fmt.Errorf("failed to initialize process.Stats: %w", err) + } + pidMap, _, err := procStats.FetchPids() + if err != nil { + return fmt.Errorf("failed to fetch elastic-agent pids: %w", err) + } + var errs error + for pid, state := range pidMap { + if len(state.Args) < 2 { + // must have at least 2 args "elastic-agent watcher" + continue + } + if state.Args[0] == "elastic-agent" && state.Args[1] == "watcher" { + // it is a watcher + proc, err := os.FindProcess(pid) + if err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to load watcher process with pid %d: %w", pid, err)) + continue + } + err = proc.Kill() + if err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to kill watcher process with pid %d: %w", pid, err)) + continue + } + } + } + return errs +} From f3eb5e70dcd2f4ce350ad7fc3eefc95cb977cedb Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 14:23:11 -0400 Subject: [PATCH 02/21] Empty commit. From 8e2d18cb82cab311ed364e9e78a825a567bcb785 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 14:35:46 -0400 Subject: [PATCH 03/21] Fix killWatcher. --- internal/pkg/agent/install/uninstall.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 54d3cecc616..1fa8163df03 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -322,7 +322,7 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) // killWatcher finds and kills any running Elastic Agent watcher. func killWatcher() error { procStats := process.Stats{ - Procs: []string{"elastic-agent"}, + Procs: []string{".*elastic-agent"}, CacheCmdLine: true, } err := procStats.Init() @@ -339,8 +339,8 @@ func killWatcher() error { // must have at least 2 args "elastic-agent watcher" continue } - if state.Args[0] == "elastic-agent" && state.Args[1] == "watcher" { - // it is a watcher + if filepath.Base(state.Args[0]) == "elastic-agent" && state.Args[1] == "watch" { + // it is the watch subprocess proc, err := os.FindProcess(pid) if err != nil { errs = errors.Join(errs, fmt.Errorf("failed to load watcher process with pid %d: %w", pid, err)) From 4476e45835da48413592ddc2b10b81925e89b7f2 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 14:45:11 -0400 Subject: [PATCH 04/21] Empty commit. From 14e9b2c996060fa05c46ccc8dce8b702a2303378 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 15:06:01 -0400 Subject: [PATCH 05/21] Another fix for killWatcher. --- internal/pkg/agent/install/uninstall.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 1fa8163df03..1adfc22fb28 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -322,8 +322,9 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) // killWatcher finds and kills any running Elastic Agent watcher. func killWatcher() error { procStats := process.Stats{ - Procs: []string{".*elastic-agent"}, - CacheCmdLine: true, + // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't + // seem to work as expected, filtering is done in the for loop below + Procs: []string{".*"}, } err := procStats.Init() if err != nil { @@ -336,7 +337,7 @@ func killWatcher() error { var errs error for pid, state := range pidMap { if len(state.Args) < 2 { - // must have at least 2 args "elastic-agent watcher" + // must have at least 2 args "elastic-agent watch" continue } if filepath.Base(state.Args[0]) == "elastic-agent" && state.Args[1] == "watch" { From cbe4ed23d497e6ec3c0078a06ab43fd61cb039c8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 15:13:45 -0400 Subject: [PATCH 06/21] Empty commit. From 747d3db51b739eb66d8e7db6e3e747c509e55f90 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 15:29:05 -0400 Subject: [PATCH 07/21] Catch ErrProcessDone. --- internal/pkg/agent/install/uninstall.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 1adfc22fb28..b46b404c43a 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -348,7 +348,7 @@ func killWatcher() error { continue } err = proc.Kill() - if err != nil { + if err != nil && !errors.Is(err, os.ErrProcessDone) { errs = errors.Join(errs, fmt.Errorf("failed to kill watcher process with pid %d: %w", pid, err)) continue } From c29ba3d7ded5fcdb778d7fdc3927c2fcb43bab33 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 7 Sep 2023 15:44:11 -0400 Subject: [PATCH 08/21] Empty commit. From 693ca9cd3bb86d90ca7c994e5291d63869e58468 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 8 Sep 2023 11:06:41 -0400 Subject: [PATCH 09/21] Empty commit From b706d0f115e9153a714d155f14143cf1210142b5 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 8 Sep 2023 11:34:25 -0400 Subject: [PATCH 10/21] Add changelog fragment. --- ...and-kills-any-running-watcher-process.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml diff --git a/changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml b/changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml new file mode 100644 index 00000000000..057cc06d33e --- /dev/null +++ b/changelog/fragments/1694187216-Uninstall-finds-and-kills-any-running-watcher-process.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Uninstall finds and kills any running watcher process + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3384 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3371 From 42ce5894296cdea942e64c95b7bb8f6c9eb3ed04 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 8 Sep 2023 13:23:10 -0400 Subject: [PATCH 11/21] Make it work on Windows. --- internal/pkg/agent/install/uninstall.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index b46b404c43a..66a0eeac002 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -332,15 +332,15 @@ func killWatcher() error { } pidMap, _, err := procStats.FetchPids() if err != nil { - return fmt.Errorf("failed to fetch elastic-agent pids: %w", err) + return fmt.Errorf("failed to fetch pids: %w", err) } var errs error for pid, state := range pidMap { if len(state.Args) < 2 { - // must have at least 2 args "elastic-agent watch" + // must have at least 2 args "elastic-agent[.exe] watch" continue } - if filepath.Base(state.Args[0]) == "elastic-agent" && state.Args[1] == "watch" { + if filepath.Base(state.Args[0]) == paths.BinaryName && state.Args[1] == "watch" { // it is the watch subprocess proc, err := os.FindProcess(pid) if err != nil { From f8de734b4005c45a4b2960574b156c944b50d9ee Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 11 Sep 2023 09:34:34 -0400 Subject: [PATCH 12/21] Change killWatcher to be in a loop. --- internal/pkg/agent/install/uninstall.go | 60 ++++++++++++++++++------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 66a0eeac002..d5f28ac5710 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -321,6 +321,38 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) // killWatcher finds and kills any running Elastic Agent watcher. func killWatcher() error { + for { + // finding and killing watchers is performed in a loop until no + // more watchers are existing, this ensures that during uninstall + // that no matter what the watchers are dead before going any further + pids, errs := findWatchers() + if errs != nil { + return errs + } + if len(pids) == 0 { + return nil + } + for _, pid := range pids { + proc, err := os.FindProcess(pid) + if err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to load watcher process with pid %d: %w", pid, err)) + continue + } + err = proc.Kill() + if err != nil && !errors.Is(err, os.ErrProcessDone) { + errs = errors.Join(errs, fmt.Errorf("failed to kill watcher process with pid %d: %w", pid, err)) + continue + } + } + if errs != nil { + return errs + } + // wait 1 second before performing the loop again + <-time.After(1 * time.Second) + } +} + +func findWatchers() ([]int, error) { procStats := process.Stats{ // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't // seem to work as expected, filtering is done in the for loop below @@ -328,31 +360,29 @@ func killWatcher() error { } err := procStats.Init() if err != nil { - return fmt.Errorf("failed to initialize process.Stats: %w", err) + return nil, fmt.Errorf("failed to initialize process.Stats: %w", err) } pidMap, _, err := procStats.FetchPids() if err != nil { - return fmt.Errorf("failed to fetch pids: %w", err) + return nil, fmt.Errorf("failed to fetch pids: %w", err) } + var pids []int var errs error for pid, state := range pidMap { if len(state.Args) < 2 { // must have at least 2 args "elastic-agent[.exe] watch" continue } - if filepath.Base(state.Args[0]) == paths.BinaryName && state.Args[1] == "watch" { - // it is the watch subprocess - proc, err := os.FindProcess(pid) - if err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to load watcher process with pid %d: %w", pid, err)) - continue - } - err = proc.Kill() - if err != nil && !errors.Is(err, os.ErrProcessDone) { - errs = errors.Join(errs, fmt.Errorf("failed to kill watcher process with pid %d: %w", pid, err)) - continue - } + // instead of matching on Windows using the specific '.exe' suffix, this ensures + // that even if the watcher is spawned without the '.exe' suffix (which Windows will allow and supports) + // it always results in the watch process being killed + if strings.TrimSuffix(filepath.Base(state.Args[0]), ".exe") == "elastic-agent" && state.Args[1] == "watch" { + // it is a watch subprocess + pids = append(pids, pid) } } - return errs + if errs != nil { + return nil, errs + } + return pids, nil } From 466b408c02312900bbf70900257c333be9cd48be Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 11 Sep 2023 09:41:00 -0400 Subject: [PATCH 13/21] Add loop to killWatcher. --- internal/pkg/agent/install/uninstall.go | 40 +------------------- pkg/utils/watcher.go | 49 +++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 38 deletions(-) create mode 100644 pkg/utils/watcher.go diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index d5f28ac5710..6cca1aebbf9 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -16,8 +16,6 @@ import ( "github.com/kardianos/service" - "github.com/elastic/elastic-agent-system-metrics/metric/system/process" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" aerrors "github.com/elastic/elastic-agent/internal/pkg/agent/errors" @@ -30,6 +28,7 @@ import ( compruntime "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/features" + "github.com/elastic/elastic-agent/pkg/utils" ) // Uninstall uninstalls persistently Elastic Agent on the system. @@ -325,7 +324,7 @@ func killWatcher() error { // finding and killing watchers is performed in a loop until no // more watchers are existing, this ensures that during uninstall // that no matter what the watchers are dead before going any further - pids, errs := findWatchers() + pids, errs := utils.GetWatcherPIDs() if errs != nil { return errs } @@ -351,38 +350,3 @@ func killWatcher() error { <-time.After(1 * time.Second) } } - -func findWatchers() ([]int, error) { - procStats := process.Stats{ - // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't - // seem to work as expected, filtering is done in the for loop below - Procs: []string{".*"}, - } - err := procStats.Init() - if err != nil { - return nil, fmt.Errorf("failed to initialize process.Stats: %w", err) - } - pidMap, _, err := procStats.FetchPids() - if err != nil { - return nil, fmt.Errorf("failed to fetch pids: %w", err) - } - var pids []int - var errs error - for pid, state := range pidMap { - if len(state.Args) < 2 { - // must have at least 2 args "elastic-agent[.exe] watch" - continue - } - // instead of matching on Windows using the specific '.exe' suffix, this ensures - // that even if the watcher is spawned without the '.exe' suffix (which Windows will allow and supports) - // it always results in the watch process being killed - if strings.TrimSuffix(filepath.Base(state.Args[0]), ".exe") == "elastic-agent" && state.Args[1] == "watch" { - // it is a watch subprocess - pids = append(pids, pid) - } - } - if errs != nil { - return nil, errs - } - return pids, nil -} diff --git a/pkg/utils/watcher.go b/pkg/utils/watcher.go new file mode 100644 index 00000000000..3c5e5861ac5 --- /dev/null +++ b/pkg/utils/watcher.go @@ -0,0 +1,49 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package utils + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/elastic/elastic-agent-system-metrics/metric/system/process" +) + +// GetWatcherPIDs returns the PID's of any running `elastic-agent watch` process. +func GetWatcherPIDs() ([]int, error) { + procStats := process.Stats{ + // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't + // seem to work as expected, filtering is done in the for loop below + Procs: []string{".*"}, + } + err := procStats.Init() + if err != nil { + return nil, fmt.Errorf("failed to initialize process.Stats: %w", err) + } + pidMap, _, err := procStats.FetchPids() + if err != nil { + return nil, fmt.Errorf("failed to fetch pids: %w", err) + } + var pids []int + var errs error + for pid, state := range pidMap { + if len(state.Args) < 2 { + // must have at least 2 args "elastic-agent[.exe] watch" + continue + } + // instead of matching on Windows using the specific '.exe' suffix, this ensures + // that even if the watcher is spawned without the '.exe' suffix (which Windows will allow and supports) + // it always results in the watch process being killed + if strings.TrimSuffix(filepath.Base(state.Args[0]), ".exe") == "elastic-agent" && state.Args[1] == "watch" { + // it is a watch subprocess + pids = append(pids, pid) + } + } + if errs != nil { + return nil, errs + } + return pids, nil +} From 3b0f040460cba561d9503d716b33c241893c01db Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 12 Sep 2023 19:31:19 -0400 Subject: [PATCH 14/21] Revert "Skip TestStandaloneUpgradeFailsStatus to fix failing integration tests again (#3391)" This reverts commit bf467e32de6c87c73d3f411c35fac40c9f3b8c75. --- testing/integration/upgrade_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/testing/integration/upgrade_test.go b/testing/integration/upgrade_test.go index cf363bdf8fc..00361ff98d4 100644 --- a/testing/integration/upgrade_test.go +++ b/testing/integration/upgrade_test.go @@ -933,8 +933,6 @@ func TestStandaloneUpgradeFailsStatus(t *testing.T) { Sudo: true, // requires Agent installation }) - t.Skip("Affected by https://github.com/elastic/elastic-agent/issues/3371, watcher left running at end of test") - upgradeFromVersion, err := version.ParseVersion(define.Version()) require.NoError(t, err) From 51ad472440b2136cf483bc49b72d4b09724618ca Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 12 Sep 2023 19:32:02 -0400 Subject: [PATCH 15/21] Revert "Fix integration tests by waiting for the watcher to finish during upgrade tests (#3370)" This reverts commit 94764be90ce4a207f2e380390cfb67558095eb9b. --- testing/integration/upgrade_test.go | 85 +++++++++++------------------ 1 file changed, 32 insertions(+), 53 deletions(-) diff --git a/testing/integration/upgrade_test.go b/testing/integration/upgrade_test.go index 00361ff98d4..46a74806ba9 100644 --- a/testing/integration/upgrade_test.go +++ b/testing/integration/upgrade_test.go @@ -9,6 +9,7 @@ package integration import ( "context" "encoding/json" + "errors" "fmt" "io/fs" "net/http" @@ -44,19 +45,12 @@ import ( agtversion "github.com/elastic/elastic-agent/version" ) -// The watcher will need the default 10 minutes to complete for a Fleet managed agent, see https://github.com/elastic/elastic-agent/issues/2977. -const defaultWatcherDuration = 10 * time.Minute - -// Configure standalone agents to complete faster to speed up tests. -const standaloneWatcherDuration = time.Minute - -// Note: this configuration can't apply to Fleet managed upgrades until https://github.com/elastic/elastic-agent/issues/2977 is resolved -var fastWatcherCfg = fmt.Sprintf(` +const fastWatcherCfg = ` agent.upgrade.watcher: - grace_period: %s + grace_period: 1m error_check.interval: 15s crash_check.interval: 15s -`, standaloneWatcherDuration) +` // notable versions used in tests @@ -100,11 +94,8 @@ func TestFleetManagedUpgrade(t *testing.T) { err = agentFixture.Prepare(ctx) require.NoError(t, err, "error preparing agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, parsedVersion, defaultWatcherDuration) - }) - + err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) + require.NoError(t, err, "error configuring agent fixture") testUpgradeFleetManagedElasticAgent(t, ctx, info, agentFixture, parsedVersion, define.Version()) }) } @@ -171,6 +162,10 @@ func testUpgradeFleetManagedElasticAgent(t *testing.T, ctx context.Context, info t.Log(`Waiting for enrolled Agent status to be "online"...`) require.Eventually(t, tools.WaitForAgentStatus(t, kibClient, "online"), 10*time.Minute, 15*time.Second, "Agent status is not online") + // Upgrade Watcher check disabled until + // https://github.com/elastic/elastic-agent/issues/2977 is resolved. + // checkUpgradeWatcherRan(t, s.agentFixture) + // We remove the `-SNAPSHOT` suffix because, post-upgrade, the version reported // by the Agent will not contain this suffix, even if a `-SNAPSHOT`-suffixed // version was used as the target version for the upgrade. @@ -214,11 +209,6 @@ func TestStandaloneUpgrade(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, parsedVersion, standaloneWatcherDuration) - }) - parsedUpgradeVersion, err := version.ParseVersion(define.Version()) require.NoErrorf(t, err, "define.Version() %q cannot be parsed as agent version", define.Version()) skipVerify := version_8_7_0.Less(*parsedVersion) @@ -259,11 +249,6 @@ func TestStandaloneUpgradeWithGPGFallback(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, fromVersion, standaloneWatcherDuration) - }) - _, defaultPGP := release.PGP() firstSeven := string(defaultPGP[:7]) customPGP := strings.Replace( @@ -348,14 +333,9 @@ func TestStandaloneDowngradeToPreviousSnapshotBuild(t *testing.T) { t.Logf("Targeting upgrade to version %+v", upgradeInputVersion) parsedFromVersion, err := version.ParseVersion(define.Version()) - - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, parsedFromVersion, standaloneWatcherDuration) - }) - require.NoErrorf(t, err, "define.Version() %q cannot be parsed as agent version", define.Version()) testStandaloneUpgrade(ctx, t, agentFixture, parsedFromVersion, upgradeInputVersion, expectedAgentHashAfterUpgrade, false, true, false, "") + } func getUpgradableVersions(ctx context.Context, t *testing.T, upgradeToVersion string) (upgradableVersions []*version.ParsedSemVer) { @@ -494,6 +474,8 @@ func testStandaloneUpgrade( return checkAgentHealthAndVersion(t, ctx, f, parsedUpgradeVersion.CoreVersion(), parsedUpgradeVersion.IsSnapshot(), expectedAgentHashAfterUpgrade) }, 5*time.Minute, 1*time.Second, "agent never upgraded to expected version") + checkUpgradeWatcherRan(t, f, parsedFromVersion) + if expectedAgentHashAfterUpgrade != "" { aVersion, err := c.Version(ctx) assert.NoError(t, err, "error checking version after upgrade") @@ -571,17 +553,28 @@ func checkLegacyAgentHealthAndVersion(t *testing.T, ctx context.Context, f *ates } -// waitForUpgradeWatcherToComplete asserts that the Upgrade Watcher finished running. -func waitForUpgradeWatcherToComplete(t *testing.T, f *atesting.Fixture, fromVersion *version.ParsedSemVer, timeout time.Duration) { +// checkUpgradeWatcherRan asserts that the Upgrade Watcher finished running. We use the +// presence of the update marker file as evidence that the Upgrade Watcher is still running +// and the absence of that file as evidence that the Upgrade Watcher is no longer running. +func checkUpgradeWatcherRan(t *testing.T, agentFixture *atesting.Fixture, fromVersion *version.ParsedSemVer) { t.Helper() if fromVersion.Less(*version_8_9_0_SNAPSHOT) { - t.Logf("Version %q is too old for a quick update marker check", fromVersion) - timeout = defaultWatcherDuration + t.Logf("Version %q is too old for a quick update marker check, skipping...", fromVersion) + return } - t.Logf("Waiting %s for upgrade watcher to finish running", timeout) - time.Sleep(timeout) + t.Log("Waiting for upgrade watcher to finish running...") + + updateMarkerFile := filepath.Join(agentFixture.WorkDir(), "data", ".update-marker") + require.FileExists(t, updateMarkerFile) + + now := time.Now() + require.Eventuallyf(t, func() bool { + _, err := os.Stat(updateMarkerFile) + return errors.Is(err, fs.ErrNotExist) + }, 2*time.Minute, 15*time.Second, "agent never removed update marker") + t.Logf("Upgrade Watcher completed in %s", time.Now().Sub(now)) } func extractCommitHashFromArtifact(t *testing.T, ctx context.Context, artifactVersion *version.ParsedSemVer, agentProject tools.Project) string { @@ -662,11 +655,6 @@ func TestStandaloneUpgradeRetryDownload(t *testing.T) { err = agentFixture.Configure(ctx, []byte(fastWatcherCfg)) require.NoError(t, err, "error configuring agent fixture") - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, agentFixture, upgradeFromVersion, standaloneWatcherDuration) - }) - t.Log("Install the built Agent") output, err := tools.InstallStandaloneAgent(agentFixture) t.Log(string(output)) @@ -753,6 +741,8 @@ func TestStandaloneUpgradeRetryDownload(t *testing.T) { t.Log("Waiting for upgrade to finish") wg.Wait() + checkUpgradeWatcherRan(t, agentFixture, upgradeFromVersion) + t.Log("Check Agent version to ensure upgrade is successful") currentVersion, err = getVersion(t, ctx, agentFixture) require.NoError(t, err) @@ -814,9 +804,6 @@ func TestUpgradeBrokenPackageVersion(t *testing.T) { f, err := define.NewFixture(t, define.Version()) require.NoError(t, err) - fromVersion, err := version.ParseVersion(define.Version()) - require.NoError(t, err) - // Prepare the Elastic Agent so the binary is extracted and ready to use. err = f.Prepare(context.Background()) require.NoError(t, err) @@ -824,14 +811,6 @@ func TestUpgradeBrokenPackageVersion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = f.Configure(ctx, []byte(fastWatcherCfg)) - require.NoError(t, err, "error configuring agent fixture") - - t.Cleanup(func() { - // The watcher needs to finish before the agent is uninstalled: https://github.com/elastic/elastic-agent/issues/3371 - waitForUpgradeWatcherToComplete(t, f, fromVersion, standaloneWatcherDuration) - }) - output, err := tools.InstallStandaloneAgent(f) t.Logf("Agent installation output: %q", string(output)) require.NoError(t, err) From f7f04f525f5ba6f23c40ff612305fb150ffd5175 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 13 Sep 2023 09:24:47 -0400 Subject: [PATCH 16/21] Fix test. --- testing/integration/upgrade_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/testing/integration/upgrade_test.go b/testing/integration/upgrade_test.go index 46a74806ba9..637367a82cf 100644 --- a/testing/integration/upgrade_test.go +++ b/testing/integration/upgrade_test.go @@ -988,9 +988,6 @@ inputs: return checkAgentHealthAndVersion(t, ctx, agentFixture, upgradeToVersion.CoreVersion(), upgradeToVersion.IsSnapshot(), "") }, 2*time.Minute, 250*time.Millisecond, "Upgraded Agent never became healthy") - // Wait for upgrade watcher to finish running - waitForUpgradeWatcherToComplete(t, agentFixture, upgradeFromVersion, standaloneWatcherDuration) - t.Log("Ensure the we have rolled back and the correct version is running") require.Eventually(t, func() bool { return checkAgentHealthAndVersion(t, ctx, agentFixture, upgradeFromVersion.CoreVersion(), upgradeFromVersion.IsSnapshot(), "") From 5f28319ae818f37353d9cec3c1c60e38c4fc7478 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 13 Sep 2023 09:24:50 -0400 Subject: [PATCH 17/21] Revert "Revert "Skip TestStandaloneUpgradeFailsStatus to fix failing integration tests again (#3391)"" This reverts commit 3b0f040460cba561d9503d716b33c241893c01db. --- testing/integration/upgrade_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/integration/upgrade_test.go b/testing/integration/upgrade_test.go index 637367a82cf..8d18b70e2b0 100644 --- a/testing/integration/upgrade_test.go +++ b/testing/integration/upgrade_test.go @@ -912,6 +912,8 @@ func TestStandaloneUpgradeFailsStatus(t *testing.T) { Sudo: true, // requires Agent installation }) + t.Skip("Affected by https://github.com/elastic/elastic-agent/issues/3371, watcher left running at end of test") + upgradeFromVersion, err := version.ParseVersion(define.Version()) require.NoError(t, err) From 442fa10382781a81c352447e89e5a9f8fbea6d1d Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 14 Sep 2023 11:21:11 -0400 Subject: [PATCH 18/21] Add progress tracking for uninstall like install. --- internal/pkg/agent/cmd/install.go | 53 ++++---- internal/pkg/agent/cmd/uninstall.go | 8 +- internal/pkg/agent/install/install.go | 24 ++-- internal/pkg/agent/install/progress.go | 130 +++++++++++++++----- internal/pkg/agent/install/progress_test.go | 70 +++++++---- internal/pkg/agent/install/uninstall.go | 46 +++++-- pkg/utils/watcher.go | 4 - 7 files changed, 234 insertions(+), 101 deletions(-) diff --git a/internal/pkg/agent/cmd/install.go b/internal/pkg/agent/cmd/install.go index 7803ae1f6a8..f967d0b52aa 100644 --- a/internal/pkg/agent/cmd/install.go +++ b/internal/pkg/agent/cmd/install.go @@ -180,37 +180,53 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command) error { } pt := install.NewProgressTracker(streams.Out) - pt.Start() - defer pt.Stop() + s := pt.Start() + defer func() { + if err != nil { + s.Failed() + } else { + s.Succeeded() + } + }() cfgFile := paths.ConfigFile() if status != install.PackageInstall { - err = install.Install(cfgFile, topPath, pt) + err = install.Install(cfgFile, topPath, s) if err != nil { return err } defer func() { if err != nil { - _ = install.Uninstall(cfgFile, topPath, "") + uninstallStep := s.StepStart("Uninstalling") + innerErr := install.Uninstall(cfgFile, topPath, "", uninstallStep) + if innerErr != nil { + uninstallStep.Failed() + } else { + uninstallStep.Succeeded() + } } }() if !delayEnroll { - pt.StepStart("Starting service") + startServiceStep := s.StepStart("Starting service") err = install.StartService(topPath) if err != nil { - pt.StepFailed() + startServiceStep.Failed() fmt.Fprintf(streams.Out, "Installation failed to start Elastic Agent service.\n") return err } - pt.StepSucceeded() + startServiceStep.Succeeded() defer func() { if err != nil { - fmt.Fprint(streams.Out, "Stopping service... ") - _ = install.StopService(topPath) - pt.StepSucceeded() + stoppingServiceStep := s.StepStart("Stopping service") + innerErr := install.StopService(topPath) + if innerErr != nil { + stoppingServiceStep.Failed() + } else { + stoppingServiceStep.Succeeded() + } } }() } @@ -224,25 +240,20 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command) error { enrollCmd.Stdout = os.Stdout enrollCmd.Stderr = os.Stderr - fmt.Fprint(streams.Out, "Enrolling Elastic Agent with Fleet... ") + enrollStep := s.StepStart("Enrolling Elastic Agent with Fleet") err = enrollCmd.Start() if err != nil { - pt.StepFailed() + enrollStep.Failed() return fmt.Errorf("failed to execute enroll command: %w", err) } err = enrollCmd.Wait() if err != nil { - pt.StepFailed() - if status != install.PackageInstall { - var exitErr *exec.ExitError - _ = install.Uninstall(cfgFile, topPath, "") - if err != nil && errors.As(err, &exitErr) { - return fmt.Errorf("enroll command failed with exit code: %d", exitErr.ExitCode()) - } - } + enrollStep.Failed() + // uninstall doesn't need to be performed here the defer above will + // catch the error and perform the uninstall return fmt.Errorf("enroll command failed for unknown reason: %w", err) } - pt.StepSucceeded() + enrollStep.Succeeded() } if err := info.CreateInstallMarker(topPath); err != nil { diff --git a/internal/pkg/agent/cmd/uninstall.go b/internal/pkg/agent/cmd/uninstall.go index 9a96e412302..e9446757c04 100644 --- a/internal/pkg/agent/cmd/uninstall.go +++ b/internal/pkg/agent/cmd/uninstall.go @@ -80,9 +80,15 @@ func uninstallCmd(streams *cli.IOStreams, cmd *cobra.Command) error { } } - err = install.Uninstall(paths.ConfigFile(), paths.Top(), uninstallToken) + pt := install.NewProgressTracker(streams.Out) + s := pt.Start() + + err = install.Uninstall(paths.ConfigFile(), paths.Top(), uninstallToken, s) if err != nil { + s.Failed() return err + } else { + s.Succeeded() } fmt.Fprintf(streams.Out, "Elastic Agent has been uninstalled.\n") diff --git a/internal/pkg/agent/install/install.go b/internal/pkg/agent/install/install.go index 89a04af7067..307daa49a52 100644 --- a/internal/pkg/agent/install/install.go +++ b/internal/pkg/agent/install/install.go @@ -22,7 +22,7 @@ const ( ) // Install installs Elastic Agent persistently on the system including creating and starting its service. -func Install(cfgFile, topPath string, pt *ProgressTracker) error { +func Install(cfgFile, topPath string, pt ProgressTrackerStep) error { dir, err := findDirectory() if err != nil { return errors.New(err, "failed to discover the source directory for installation", errors.TypeFilesystem) @@ -33,16 +33,16 @@ func Install(cfgFile, topPath string, pt *ProgressTracker) error { // There is no uninstall token for "install" command. // Uninstall will fail on protected agent. // The protected Agent will need to be uninstalled first before it can be installed. - pt.StepStart("Uninstalling current Elastic Agent") - err = Uninstall(cfgFile, topPath, "") + s := pt.StepStart("Uninstalling current Elastic Agent") + err = Uninstall(cfgFile, topPath, "", s) if err != nil { - pt.StepFailed() + s.Failed() return errors.New( err, fmt.Sprintf("failed to uninstall Agent at (%s)", filepath.Dir(topPath)), errors.M("directory", filepath.Dir(topPath))) } - pt.StepSucceeded() + s.Succeeded() // ensure parent directory exists err = os.MkdirAll(filepath.Dir(topPath), 0755) @@ -54,7 +54,7 @@ func Install(cfgFile, topPath string, pt *ProgressTracker) error { } // copy source into install path - pt.StepStart("Copying files") + s = pt.StepStart("Copying files") err = copy.Copy(dir, topPath, copy.Options{ OnSymlink: func(_ string) copy.SymlinkAction { return copy.Shallow @@ -62,13 +62,13 @@ func Install(cfgFile, topPath string, pt *ProgressTracker) error { Sync: true, }) if err != nil { - pt.StepFailed() + s.Failed() return errors.New( err, fmt.Sprintf("failed to copy source directory (%s) to destination (%s)", dir, topPath), errors.M("source", dir), errors.M("destination", topPath)) } - pt.StepSucceeded() + s.Succeeded() // place shell wrapper, if present on platform if paths.ShellWrapperPath != "" { @@ -132,21 +132,21 @@ func Install(cfgFile, topPath string, pt *ProgressTracker) error { } // install service - pt.StepStart("Installing service") + s = pt.StepStart("Installing service") svc, err := newService(topPath) if err != nil { - pt.StepFailed() + s.Failed() return err } err = svc.Install() if err != nil { - pt.StepFailed() + s.Failed() return errors.New( err, fmt.Sprintf("failed to install service (%s)", paths.ServiceName), errors.M("service", paths.ServiceName)) } - pt.StepSucceeded() + s.Succeeded() return nil } diff --git a/internal/pkg/agent/install/progress.go b/internal/pkg/agent/install/progress.go index dc54e592e5d..492b02bc110 100644 --- a/internal/pkg/agent/install/progress.go +++ b/internal/pkg/agent/install/progress.go @@ -15,15 +15,98 @@ import ( "golang.org/x/exp/rand" ) +// ProgressTrackerStep is a currently running step. +// +// A step can produce a sub-step that is a step that is part of another step. +type ProgressTrackerStep interface { + // Succeeded step is done and successful. + Succeeded() + // Failed step has failed. + Failed() + // StepStart creates a new step. + StepStart(msg string) ProgressTrackerStep +} + +type progressTrackerStep struct { + tracker *ProgressTracker + prefix string + + finalizeFunc func() + + rootstep bool + substeps bool + step *progressTrackerStep +} + +func newProgressTrackerStep(tracker *ProgressTracker, prefix string, finalizeFunc func()) *progressTrackerStep { + return &progressTrackerStep{ + tracker: tracker, + prefix: prefix, + finalizeFunc: finalizeFunc, + } +} + +// Succeeded step is done and successful. +func (pts *progressTrackerStep) Succeeded() { + prefix := " " + if pts.substeps { + prefix = pts.prefix + " " + } + if !pts.rootstep { + pts.tracker.printf("%sDONE\n", prefix) + } + pts.finalizeFunc() +} + +// Failed step has failed. +func (pts *progressTrackerStep) Failed() { + prefix := " " + if pts.substeps { + prefix = pts.prefix + " " + } + if !pts.rootstep { + pts.tracker.printf("%sFAILED\n", prefix) + } + pts.finalizeFunc() +} + +// StepStart creates a new step. +func (pts *progressTrackerStep) StepStart(msg string) ProgressTrackerStep { + prefix := pts.prefix + if !pts.rootstep { + prefix += " " + if !pts.substeps { + prefix = "\n" + prefix + pts.substeps = true + } + } + pts.tracker.printf("%s%s...", prefix, strings.TrimSpace(msg)) + s := newProgressTrackerStep(pts.tracker, prefix, func() { + pts.step = nil + }) + pts.step = s + return s +} + +func (pts *progressTrackerStep) tick() { + if pts.step != nil { + pts.step.tick() + return + } + if !pts.rootstep { + pts.tracker.printf(".") + } +} + type ProgressTracker struct { writer io.Writer tickInterval time.Duration randomizeTickInterval bool - stepInProgress bool - mu sync.RWMutex - stop chan struct{} + step *progressTrackerStep + mu sync.Mutex + stop chan struct{} } func NewProgressTracker(writer io.Writer) *ProgressTracker { @@ -43,7 +126,7 @@ func (pt *ProgressTracker) DisableRandomizedTickIntervals() { pt.randomizeTickInterval = false } -func (pt *ProgressTracker) Start() { +func (pt *ProgressTracker) Start() ProgressTrackerStep { timer := time.NewTimer(pt.calculateTickInterval()) go func() { defer timer.Stop() @@ -52,44 +135,27 @@ func (pt *ProgressTracker) Start() { case <-pt.stop: return case <-timer.C: - pt.mu.RLock() - if pt.stepInProgress { - _, _ = pt.writer.Write([]byte(".")) + if pt.step != nil { + pt.step.tick() } - pt.mu.RUnlock() - timer = time.NewTimer(pt.calculateTickInterval()) } } }() -} -func (pt *ProgressTracker) StepStart(msg string) { - pt.mu.Lock() - defer pt.mu.Unlock() - - pt.stepInProgress = true - fmt.Fprintf(pt.writer, strings.TrimSpace(msg)+"...") + s := newProgressTrackerStep(pt, "", func() { + pt.step = nil + pt.stop <- struct{}{} + }) + s.rootstep = true // is the root step + pt.step = s + return s } -func (pt *ProgressTracker) StepSucceeded() { +func (pt *ProgressTracker) printf(format string, a ...any) { pt.mu.Lock() defer pt.mu.Unlock() - - fmt.Fprintln(pt.writer, " DONE") - pt.stepInProgress = false -} - -func (pt *ProgressTracker) StepFailed() { - pt.mu.Lock() - defer pt.mu.Unlock() - - fmt.Fprintln(pt.writer, " FAILED") - pt.stepInProgress = false -} - -func (pt *ProgressTracker) Stop() { - pt.stop <- struct{}{} + _, _ = fmt.Fprintf(pt.writer, format, a...) } func (pt *ProgressTracker) calculateTickInterval() time.Duration { diff --git a/internal/pkg/agent/install/progress_test.go b/internal/pkg/agent/install/progress_test.go index d43c03643c7..f9bbc303228 100644 --- a/internal/pkg/agent/install/progress_test.go +++ b/internal/pkg/agent/install/progress_test.go @@ -32,12 +32,12 @@ func TestProgress(t *testing.T) { w := newTestWriter() pt := NewProgressTracker(w) - pt.Start() + rs := pt.Start() - pt.StepStart("step 1 starting") - pt.StepFailed() + s := rs.StepStart("step 1 starting") + s.Failed() - pt.Stop() + rs.Failed() require.Equal(t, "step 1 starting... FAILED\n", string(w.buf)) }) @@ -48,13 +48,13 @@ func TestProgress(t *testing.T) { pt.SetTickInterval(10 * time.Millisecond) // to speed up testing pt.DisableRandomizedTickIntervals() - pt.Start() + rs := pt.Start() - pt.StepStart("step 1 starting") + s := rs.StepStart("step 1 starting") time.Sleep(15 * time.Millisecond) // to simulate work being done - pt.StepFailed() + s.Failed() - pt.Stop() + rs.Failed() require.Regexp(t, regexp.MustCompile(`step 1 starting\.{3}\.+ FAILED\n`), string(w.buf)) }) @@ -64,14 +64,14 @@ func TestProgress(t *testing.T) { pt := NewProgressTracker(w) pt.DisableRandomizedTickIntervals() - pt.Start() + rs := pt.Start() - pt.StepStart("step 1 starting") - pt.StepSucceeded() - pt.StepStart("step 2 starting") - pt.StepSucceeded() + s := rs.StepStart("step 1 starting") + s.Succeeded() + s = rs.StepStart("step 2 starting") + s.Succeeded() - pt.Stop() + rs.Succeeded() require.Equal(t, "step 1 starting... DONE\nstep 2 starting... DONE\n", string(w.buf)) }) @@ -82,16 +82,16 @@ func TestProgress(t *testing.T) { pt.SetTickInterval(10 * time.Millisecond) // to speed up testing pt.DisableRandomizedTickIntervals() - pt.Start() + rs := pt.Start() - pt.StepStart("step 1 starting") + s := rs.StepStart("step 1 starting") time.Sleep(55 * time.Millisecond) // to simulate work being done - pt.StepSucceeded() - pt.StepStart("step 2 starting") + s.Succeeded() + s = rs.StepStart("step 2 starting") time.Sleep(25 * time.Millisecond) // to simulate work being done - pt.StepSucceeded() + s.Succeeded() - pt.Stop() + rs.Succeeded() require.Regexp(t, regexp.MustCompile(`step 1 starting\.{3}\.+ DONE\nstep 2 starting\.{3}\.+ DONE`), string(w.buf)) }) @@ -102,15 +102,37 @@ func TestProgress(t *testing.T) { pt.SetTickInterval(10 * time.Millisecond) // to speed up testing pt.DisableRandomizedTickIntervals() - pt.Start() + rs := pt.Start() - pt.StepStart("step 1 starting") - pt.StepFailed() + s := rs.StepStart("step 1 starting") + s.Failed() time.Sleep(15 * time.Millisecond) - pt.Stop() + rs.Failed() require.Equal(t, "step 1 starting... FAILED\n", string(w.buf)) }) + + t.Run("nested_step_delayed_success", func(t *testing.T) { + w := newTestWriter() + pt := NewProgressTracker(w) + pt.SetTickInterval(10 * time.Millisecond) // to speed up testing + pt.DisableRandomizedTickIntervals() + + rs := pt.Start() + + s := rs.StepStart("step starting") + ss := s.StepStart("substep 1 starting") + time.Sleep(55 * time.Millisecond) // to simulate work being done + ss.Succeeded() + ss = s.StepStart("substep 2 starting") + time.Sleep(25 * time.Millisecond) // to simulate work being done + ss.Succeeded() + s.Succeeded() + + rs.Succeeded() + + require.Regexp(t, regexp.MustCompile(`step starting\.{3}\n substep 1 starting\.{3}\.+ DONE\n substep 2 starting\.{3}\.+ DONE\n DONE\n`), string(w.buf)) + }) } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 547c29044e6..95aac9b61d2 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -32,7 +32,7 @@ import ( ) // Uninstall uninstalls persistently Elastic Agent on the system. -func Uninstall(cfgFile, topPath, uninstallToken string) error { +func Uninstall(cfgFile, topPath, uninstallToken string, pt ProgressTrackerStep) error { // uninstall the current service svc, err := newService(topPath) if err != nil { @@ -40,18 +40,21 @@ func Uninstall(cfgFile, topPath, uninstallToken string) error { } status, _ := svc.Status() + s := pt.StepStart("Stopping service") if status == service.StatusRunning { err := svc.Stop() if err != nil { + s.Failed() return aerrors.New( err, fmt.Sprintf("failed to stop service (%s)", paths.ServiceName), aerrors.M("service", paths.ServiceName)) } } + s.Succeeded() // kill any running watcher - if err := killWatcher(); err != nil { + if err := killWatcher(s); err != nil { return fmt.Errorf("failed trying to kill any running watcher: %w", err) } @@ -71,7 +74,13 @@ func Uninstall(cfgFile, topPath, uninstallToken string) error { } // Uninstall service only after components were uninstalled successfully - _ = svc.Uninstall() + s = pt.StepStart("Removing service") + err = svc.Uninstall() + if err != nil { + s.Failed() + } else { + s.Succeeded() + } // remove, if present on platform if paths.ShellWrapperPath != "" { @@ -85,13 +94,16 @@ func Uninstall(cfgFile, topPath, uninstallToken string) error { } // remove existing directory + s = pt.StepStart("Removing install directory") err = RemovePath(topPath) if err != nil { + s.Failed() return aerrors.New( err, fmt.Sprintf("failed to remove installation directory (%s)", paths.Top()), aerrors.M("directory", paths.Top())) } + s.Succeeded() return nil } @@ -299,18 +311,35 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) } // killWatcher finds and kills any running Elastic Agent watcher. -func killWatcher() error { +func killWatcher(pt ProgressTrackerStep) error { + var s ProgressTrackerStep for { // finding and killing watchers is performed in a loop until no // more watchers are existing, this ensures that during uninstall // that no matter what the watchers are dead before going any further - pids, errs := utils.GetWatcherPIDs() - if errs != nil { - return errs + pids, err := utils.GetWatcherPIDs() + if err != nil { + if s != nil { + s.Failed() + } + return err } if len(pids) == 0 { + if s != nil { + s.Succeeded() + } return nil } + + if s == nil { + var pidsStr []string + for _, pid := range pids { + pidsStr = append(pidsStr, fmt.Sprintf("%d", pid)) + } + s = pt.StepStart(fmt.Sprintf("Stopping upgrade watcher (%s)", strings.Join(pidsStr, ", "))) + } + + var errs error for _, pid := range pids { proc, err := os.FindProcess(pid) if err != nil { @@ -324,6 +353,9 @@ func killWatcher() error { } } if errs != nil { + if s != nil { + s.Failed() + } return errs } // wait 1 second before performing the loop again diff --git a/pkg/utils/watcher.go b/pkg/utils/watcher.go index 3c5e5861ac5..fa018b704b3 100644 --- a/pkg/utils/watcher.go +++ b/pkg/utils/watcher.go @@ -28,7 +28,6 @@ func GetWatcherPIDs() ([]int, error) { return nil, fmt.Errorf("failed to fetch pids: %w", err) } var pids []int - var errs error for pid, state := range pidMap { if len(state.Args) < 2 { // must have at least 2 args "elastic-agent[.exe] watch" @@ -42,8 +41,5 @@ func GetWatcherPIDs() ([]int, error) { pids = append(pids, pid) } } - if errs != nil { - return nil, errs - } return pids, nil } From 98aa475c372945a378837f1e263c73e9dd86718c Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 14 Sep 2023 11:25:23 -0400 Subject: [PATCH 19/21] Log when no watchers our found. --- internal/pkg/agent/install/uninstall.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 95aac9b61d2..6631b10d7c0 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -327,6 +327,10 @@ func killWatcher(pt ProgressTrackerStep) error { if len(pids) == 0 { if s != nil { s.Succeeded() + } else { + // step was never started so no watcher was found on first loop + s = pt.StepStart("Stopping upgrade watcher; none found") + s.Succeeded() } return nil } From ca4e40570c8d85626e72bdf1f3c5458ecf3c452a Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 14 Sep 2023 12:03:49 -0400 Subject: [PATCH 20/21] Improve uninstall. --- internal/pkg/agent/install/uninstall.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 6631b10d7c0..9ca81cbfa6e 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -54,12 +54,12 @@ func Uninstall(cfgFile, topPath, uninstallToken string, pt ProgressTrackerStep) s.Succeeded() // kill any running watcher - if err := killWatcher(s); err != nil { + if err := killWatcher(pt); err != nil { return fmt.Errorf("failed trying to kill any running watcher: %w", err) } // Uninstall components first - if err := uninstallComponents(context.Background(), cfgFile, uninstallToken); err != nil { + if err := uninstallComponents(context.Background(), cfgFile, uninstallToken, pt); err != nil { // If service status was running it was stopped to uninstall the components. // If the components uninstall failed start the service again if status == service.StatusRunning { @@ -181,7 +181,7 @@ func containsString(str string, a []string, caseSensitive bool) bool { return false } -func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken string) error { +func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken string, pt ProgressTrackerStep) error { log, err := logger.NewWithLogpLevel("", logp.ErrorLevel, false) if err != nil { return err @@ -235,7 +235,7 @@ func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken str // This component is not active continue } - if err := uninstallServiceComponent(ctx, log, comp, uninstallToken); err != nil { + if err := uninstallServiceComponent(ctx, log, comp, uninstallToken, pt); err != nil { os.Stderr.WriteString(fmt.Sprintf("failed to uninstall component %q: %s\n", comp.ID, err)) // The decision was made to change the behaviour and leave the Agent installed if Endpoint uninstall fails // https://github.com/elastic/elastic-agent/pull/2708#issuecomment-1574251911 @@ -247,11 +247,18 @@ func uninstallComponents(ctx context.Context, cfgFile string, uninstallToken str return nil } -func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp component.Component, uninstallToken string) error { +func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp component.Component, uninstallToken string, pt ProgressTrackerStep) error { // Do not use infinite retries when uninstalling from the command line. If the uninstall needs to be // retried the entire uninstall command can be retried. Retries may complete asynchronously with the // execution of the uninstall command, leading to bugs like https://github.com/elastic/elastic-agent/issues/3060. - return comprt.UninstallService(ctx, log, comp, uninstallToken) + s := pt.StepStart(fmt.Sprintf("Uninstalling service component %s", comp.InputType)) + err := comprt.UninstallService(ctx, log, comp, uninstallToken) + if err != nil { + s.Failed() + return err + } + s.Succeeded() + return nil } func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Config) ([]component.Component, error) { From b3249497263b96f7eaa9a8b7aa4adae1d5f7460c Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 19 Sep 2023 11:46:29 -0400 Subject: [PATCH 21/21] Fix data race. --- internal/pkg/agent/install/progress.go | 43 +++++++++++++++++++++----- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/internal/pkg/agent/install/progress.go b/internal/pkg/agent/install/progress.go index 492b02bc110..bc797a2c58d 100644 --- a/internal/pkg/agent/install/progress.go +++ b/internal/pkg/agent/install/progress.go @@ -35,6 +35,7 @@ type progressTrackerStep struct { rootstep bool substeps bool + mu sync.Mutex step *progressTrackerStep } @@ -82,15 +83,28 @@ func (pts *progressTrackerStep) StepStart(msg string) ProgressTrackerStep { } pts.tracker.printf("%s%s...", prefix, strings.TrimSpace(msg)) s := newProgressTrackerStep(pts.tracker, prefix, func() { - pts.step = nil + pts.setStep(nil) }) - pts.step = s + pts.setStep(s) return s } +func (pts *progressTrackerStep) getStep() *progressTrackerStep { + pts.mu.Lock() + defer pts.mu.Unlock() + return pts.step +} + +func (pts *progressTrackerStep) setStep(step *progressTrackerStep) { + pts.mu.Lock() + defer pts.mu.Unlock() + pts.step = step +} + func (pts *progressTrackerStep) tick() { - if pts.step != nil { - pts.step.tick() + step := pts.getStep() + if step != nil { + step.tick() return } if !pts.rootstep { @@ -135,8 +149,9 @@ func (pt *ProgressTracker) Start() ProgressTrackerStep { case <-pt.stop: return case <-timer.C: - if pt.step != nil { - pt.step.tick() + step := pt.getStep() + if step != nil { + step.tick() } timer = time.NewTimer(pt.calculateTickInterval()) } @@ -144,11 +159,11 @@ func (pt *ProgressTracker) Start() ProgressTrackerStep { }() s := newProgressTrackerStep(pt, "", func() { - pt.step = nil + pt.setStep(nil) pt.stop <- struct{}{} }) s.rootstep = true // is the root step - pt.step = s + pt.setStep(s) return s } @@ -158,6 +173,18 @@ func (pt *ProgressTracker) printf(format string, a ...any) { _, _ = fmt.Fprintf(pt.writer, format, a...) } +func (pt *ProgressTracker) getStep() *progressTrackerStep { + pt.mu.Lock() + defer pt.mu.Unlock() + return pt.step +} + +func (pt *ProgressTracker) setStep(step *progressTrackerStep) { + pt.mu.Lock() + defer pt.mu.Unlock() + pt.step = step +} + func (pt *ProgressTracker) calculateTickInterval() time.Duration { if !pt.randomizeTickInterval { return pt.tickInterval