From 8b56088a3b033b527eb3e6b50c1507a147abc01b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 14 May 2021 21:05:32 +0200 Subject: [PATCH] [7.x](backport #1153) feat: check how many processes of a process are running in the host (#1154) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: check how many processes of a process are running in the host (#1153) * fix: use docker's stdcopy to separate stdout from stderr This will allow removing the initial bytes when reading outputs from command execution in a container * feat: support checking the number of occurrences of a process in a container It uses pgrep to get all pids for a process, and then iterates through them to get the runnable status for each pid. If the process must be started in the host, then it will check that the pid is in the S status (to skip zombie processes) * fix: check for only one filebeat instance * fix: check for empty response when listing agent's workdir (cherry picked from commit 78a0d49dbdb089b753c238eecb0ecdd5d927bc06) * fix: check for 1 filebeat process only * fix: check for 1 metricbeat process only Co-authored-by: Manuel de la Peña --- .../fleet/features/backend_processes.feature | 16 +- .../fleet/features/stand_alone_agent.feature | 4 +- e2e/_suites/fleet/fleet.go | 4 +- e2e/_suites/fleet/ingest_manager_test.go | 1 + e2e/_suites/fleet/world.go | 12 +- internal/docker/docker.go | 199 +++++++++++++----- 6 files changed, 174 insertions(+), 62 deletions(-) diff --git a/e2e/_suites/fleet/features/backend_processes.feature b/e2e/_suites/fleet/features/backend_processes.feature index 35df13c32e..e4a2b1f710 100644 --- a/e2e/_suites/fleet/features/backend_processes.feature +++ b/e2e/_suites/fleet/features/backend_processes.feature @@ -6,8 +6,8 @@ Feature: Backend Processes Scenario Outline: Deploying the agent Given a "" agent is deployed to Fleet with "tar" installer When the "elastic-agent" process is in the "started" state on the host - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "1" instances of the "filebeat" process in the "started" state + And there are "1" instances of the "metricbeat" process in the "started" state @centos Examples: Centos @@ -23,8 +23,8 @@ Examples: Debian Scenario Outline: Deploying the agent with enroll and then run on rpm and deb Given a "" agent is deployed to Fleet with "systemd" installer When the "elastic-agent" process is in the "started" state on the host - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "1" instances of the "filebeat" process in the "started" state + And there are "1" instances of the "metricbeat" process in the "started" state @centos Examples: Centos @@ -57,8 +57,8 @@ Examples: Debian Scenario Outline: Restarting the installed agent Given a "" agent is deployed to Fleet with "tar" installer When the "elastic-agent" process is "restarted" on the host - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "1" instances of the "filebeat" process in the "started" state + And there are "1" instances of the "metricbeat" process in the "started" state @centos Examples: Centos @@ -75,8 +75,8 @@ Scenario Outline: Restarting the host with persistent agent restarts backen Given a "" agent is deployed to Fleet with "tar" installer When the host is restarted Then the "elastic-agent" process is in the "started" state on the host - And the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + And there are "1" instances of the "filebeat" process in the "started" state + And there are "1" instances of the "metricbeat" process in the "started" state @centos Examples: Centos diff --git a/e2e/_suites/fleet/features/stand_alone_agent.feature b/e2e/_suites/fleet/features/stand_alone_agent.feature index 20a44ce177..639a21a676 100644 --- a/e2e/_suites/fleet/features/stand_alone_agent.feature +++ b/e2e/_suites/fleet/features/stand_alone_agent.feature @@ -7,8 +7,8 @@ Feature: Stand-alone Agent @start-agent Scenario Outline: Starting the agent starts backend processes When a "" stand-alone agent is deployed - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "1" instances of the "filebeat" process in the "started" state + And there are "2" instances of the "metricbeat" process in the "started" state @default Examples: default diff --git a/e2e/_suites/fleet/fleet.go b/e2e/_suites/fleet/fleet.go index 547edd5d20..1113119e9c 100644 --- a/e2e/_suites/fleet/fleet.go +++ b/e2e/_suites/fleet/fleet.go @@ -461,7 +461,7 @@ func (fts *FleetTestSuite) processStateChangedOnTheHost(process string, state st containerName := fts.getContainerName(agentInstaller, 1) - return docker.CheckProcessStateOnTheHost(containerName, process, "stopped", common.TimeoutFactor) + return docker.CheckProcessStateOnTheHost(containerName, process, "stopped", 1, common.TimeoutFactor) } func (fts *FleetTestSuite) setup() error { @@ -563,7 +563,7 @@ func (fts *FleetTestSuite) theFileSystemAgentFolderIsEmpty() error { return err } - if strings.Contains(content, "No such file or directory") { + if content == "" || strings.Contains(content, "No such file or directory") { return nil } diff --git a/e2e/_suites/fleet/ingest_manager_test.go b/e2e/_suites/fleet/ingest_manager_test.go index 8842f4749b..629795b5d0 100644 --- a/e2e/_suites/fleet/ingest_manager_test.go +++ b/e2e/_suites/fleet/ingest_manager_test.go @@ -105,6 +105,7 @@ func InitializeIngestManagerTestScenario(ctx *godog.ScenarioContext) { }) ctx.Step(`^the "([^"]*)" process is in the "([^"]*)" state on the host$`, imts.processStateOnTheHost) + ctx.Step(`^there are "([^"]*)" instances of the "([^"]*)" process in the "([^"]*)" state$`, imts.thereAreInstancesOfTheProcessInTheState) imts.Fleet.contributeSteps(ctx) } diff --git a/e2e/_suites/fleet/world.go b/e2e/_suites/fleet/world.go index 27cc74345f..132f56f575 100644 --- a/e2e/_suites/fleet/world.go +++ b/e2e/_suites/fleet/world.go @@ -6,6 +6,7 @@ package main import ( "fmt" + "strconv" "github.com/elastic/e2e-testing/internal/common" "github.com/elastic/e2e-testing/internal/docker" @@ -17,6 +18,10 @@ type IngestManagerTestSuite struct { } func (imts *IngestManagerTestSuite) processStateOnTheHost(process string, state string) error { + return imts.thereAreInstancesOfTheProcessInTheState("1", process, state) +} + +func (imts *IngestManagerTestSuite) thereAreInstancesOfTheProcessInTheState(ocurrences string, process string, state string) error { profile := common.FleetProfileName var containerName string @@ -28,5 +33,10 @@ func (imts *IngestManagerTestSuite) processStateOnTheHost(process string, state containerName = imts.Fleet.getContainerName(agentInstaller, 1) } - return docker.CheckProcessStateOnTheHost(containerName, process, state, common.TimeoutFactor) + count, err := strconv.Atoi(ocurrences) + if err != nil { + return err + } + + return docker.CheckProcessStateOnTheHost(containerName, process, state, count, common.TimeoutFactor) } diff --git a/internal/docker/docker.go b/internal/docker/docker.go index 592702a7e0..fc63fd30a8 100644 --- a/internal/docker/docker.go +++ b/internal/docker/docker.go @@ -22,6 +22,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" "github.com/elastic/e2e-testing/internal/common" log "github.com/sirupsen/logrus" ) @@ -31,6 +32,12 @@ var instance *client.Client // OPNetworkName name of the network used by the tool const OPNetworkName = "elastic-dev-network" +type execResult struct { + StdOut string + StdErr string + ExitCode int +} + func buildTarForDeployment(file *os.File) (bytes.Buffer, error) { fileInfo, _ := file.Stat() @@ -66,10 +73,10 @@ func buildTarForDeployment(file *os.File) (bytes.Buffer, error) { // we are using the Docker client instead of docker-compose // because it does not support returning the output of a // command: it simply returns error level -func CheckProcessStateOnTheHost(containerName string, process string, state string, timeoutFactor int) error { +func CheckProcessStateOnTheHost(containerName string, process string, state string, occurrences int, timeoutFactor int) error { timeout := time.Duration(common.TimeoutFactor) * time.Minute - err := WaitForProcess(containerName, process, state, timeout) + err := WaitForProcess(containerName, process, state, occurrences, timeout) if err != nil { if state == "started" { log.WithFields(log.Fields{ @@ -226,8 +233,31 @@ func ExecCommandIntoContainerWithEnv(ctx context.Context, containerName string, } defer resp.Close() - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Reader) + // see https://stackoverflow.com/a/57132902 + var execRes execResult + + // read the output + var outBuf, errBuf bytes.Buffer + outputDone := make(chan error) + + go func() { + // StdCopy demultiplexes the stream into two buffers + _, err = stdcopy.StdCopy(&outBuf, &errBuf, resp.Reader) + outputDone <- err + }() + + select { + case err := <-outputDone: + if err != nil { + return "", err + } + break + + case <-ctx.Done(): + return "", ctx.Err() + } + + stdout, err := ioutil.ReadAll(&outBuf) if err != nil { log.WithFields(log.Fields{ "container": containerName, @@ -236,35 +266,28 @@ func ExecCommandIntoContainerWithEnv(ctx context.Context, containerName string, "env": env, "error": err, "tty": tty, - }).Error("Could not parse command output from container") + }).Error("Could not parse stdout from container") return "", err } - output := buf.String() - - log.WithFields(log.Fields{ - "container": containerName, - "command": cmd, - "detach": detach, - "env": env, - "tty": tty, - }).Trace("Command sucessfully executed in container") - - output = strings.ReplaceAll(output, "\n", "") - - patterns := []string{ - "\x01\x00\x00\x00\x00\x00\x00\r", - "\x01\x00\x00\x00\x00\x00\x00)", - } - for _, pattern := range patterns { - if strings.HasPrefix(output, pattern) { - output = strings.ReplaceAll(output, pattern, "") - log.WithFields(log.Fields{ - "output": output, - }).Trace("Output name has been sanitized") - } + stderr, err := ioutil.ReadAll(&errBuf) + if err != nil { + log.WithFields(log.Fields{ + "container": containerName, + "command": cmd, + "detach": detach, + "env": env, + "error": err, + "tty": tty, + }).Error("Could not parse stderr from container") + return "", err } - return output, nil + execRes.ExitCode = 0 + execRes.StdOut = string(stdout) + execRes.StdErr = string(stderr) + + // remove '\n' from the response + return strings.ReplaceAll(execRes.StdOut, "\n", ""), nil } // GetContainerHostname we need the container name because we use the Docker Client instead of Docker Compose @@ -434,7 +457,7 @@ func RemoveDevNetwork() error { // WaitForProcess polls a container executing "ps" command until the process is in the desired state (present or not), // or a timeout happens -func WaitForProcess(containerName string, process string, desiredState string, maxTimeout time.Duration) error { +func WaitForProcess(containerName string, process string, desiredState string, ocurrences int, maxTimeout time.Duration) error { exp := common.GetExponentialBackOff(maxTimeout) mustBePresent := false @@ -446,49 +469,125 @@ func WaitForProcess(containerName string, process string, desiredState string, m processStatus := func() error { log.WithFields(log.Fields{ "desiredState": desiredState, + "ocurrences": ocurrences, "process": process, }).Trace("Checking process desired state on the container") - output, err := ExecCommandIntoContainer(context.Background(), containerName, "root", []string{"pgrep", "-n", "-l", process}) + // pgrep -d: -d, --delimiter specify output delimiter + //i.e. "pgrep -d , metricbeat": 483,519 + cmds := []string{"pgrep", "-d", ",", process} + output, err := ExecCommandIntoContainer(context.Background(), containerName, "root", cmds) if err != nil { log.WithFields(log.Fields{ + "cmds": cmds, "desiredState": desiredState, "elapsedTime": exp.GetElapsedTime(), "error": err, "container": containerName, "mustBePresent": mustBePresent, + "ocurrences": ocurrences, "process": process, "retry": retryCount, - }).Warn("Could not execute 'pgrep -n -l' in the container") + }).Warn("Could not get number of processes in the container") retryCount++ return err } - outputContainsProcess := strings.Contains(output, process) + // tokenize the pids to get each pid's state, adding them to an array if they match the desired state + // From Split docs: + // If output does not contain sep and sep is not empty, Split returns a + // slice of length 1 whose only element is s, that's why we first initialise to the empty array + pids := strings.Split(output, ",") + if len(pids) == 1 && pids[0] == "" { + pids = []string{} + } + + log.WithFields(log.Fields{ + "count": len(pids), + "desiredState": desiredState, + "mustBePresent": mustBePresent, + "pids": pids, + "process": process, + }).Tracef("Pids for process found") + + desiredStatePids := []string{} + + for _, pid := range pids { + pidStateCmds := []string{"ps", "-q", pid, "-o", "state", "--no-headers"} + pidState, err := ExecCommandIntoContainer(context.Background(), containerName, "root", pidStateCmds) + if err != nil { + log.WithFields(log.Fields{ + "cmds": cmds, + "desiredState": desiredState, + "elapsedTime": exp.GetElapsedTime(), + "error": err, + "container": containerName, + "mustBePresent": mustBePresent, + "ocurrences": ocurrences, + "pid": pid, + "process": process, + "retry": retryCount, + }).Warn("Could not check pid status in the container") + + retryCount++ + + return err + } - // both true or both false - if mustBePresent == outputContainsProcess { log.WithFields(log.Fields{ "desiredState": desiredState, - "container": containerName, "mustBePresent": mustBePresent, + "pid": pid, + "pidState": pidState, "process": process, + }).Tracef("Checking if process is in the S state") + + // if the process must be present, then check for the S state + // From 'man ps': + // D uninterruptible sleep (usually IO) + // R running or runnable (on run queue) + // S interruptible sleep (waiting for an event to complete) + // T stopped by job control signal + // t stopped by debugger during the tracing + // W paging (not valid since the 2.6.xx kernel) + // X dead (should never be seen) + // Z defunct ("zombie") process, terminated but not reaped by its parent + if mustBePresent && pidState == "S" { + desiredStatePids = append(desiredStatePids, pid) + } else if !mustBePresent { + desiredStatePids = append(desiredStatePids, pid) + } + } + + occurrencesMatched := (len(desiredStatePids) == ocurrences) + + // both true or both false + if mustBePresent == occurrencesMatched { + log.WithFields(log.Fields{ + "desiredOcurrences": ocurrences, + "desiredState": desiredState, + "container": containerName, + "mustBePresent": mustBePresent, + "ocurrences": len(desiredStatePids), + "process": process, }).Infof("Process desired state checked") return nil } if mustBePresent { - err = fmt.Errorf("%s process is not running in the container yet", process) + err = fmt.Errorf("%s process is not running in the container with the desired number of occurrences (%d) yet", process, ocurrences) log.WithFields(log.Fields{ - "desiredState": desiredState, - "elapsedTime": exp.GetElapsedTime(), - "error": err, - "container": containerName, - "process": process, - "retry": retryCount, + "desiredOcurrences": ocurrences, + "desiredState": desiredState, + "elapsedTime": exp.GetElapsedTime(), + "error": err, + "container": containerName, + "ocurrences": len(desiredStatePids), + "process": process, + "retry": retryCount, }).Warn(err.Error()) retryCount++ @@ -498,12 +597,14 @@ func WaitForProcess(containerName string, process string, desiredState string, m err = fmt.Errorf("%s process is still running in the container", process) log.WithFields(log.Fields{ - "elapsedTime": exp.GetElapsedTime(), - "error": err, - "container": containerName, - "process": process, - "state": desiredState, - "retry": retryCount, + "desiredOcurrences": ocurrences, + "elapsedTime": exp.GetElapsedTime(), + "error": err, + "container": containerName, + "ocurrences": len(desiredStatePids), + "process": process, + "state": desiredState, + "retry": retryCount, }).Warn(err.Error()) retryCount++