From 78a2f9f398c0d78d779923a9f1e9a23fbe5b76e2 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Wed, 24 Mar 2021 18:24:18 -0700 Subject: [PATCH] Testing: Remove legacy cruft from process This removes some legacy cruft from the process abstraction, namely: - the legacy health check support (startup message), since our supported etcd versions support the /health endpoint - URL populatation, since we end up needing multiple ports for the api server - gexec usage, since we don't need to check wait messages any more and the equivalent "send SIGTERM" and "wait for process to exit" parts are pretty straightforward for our usecases. --- .../testing/controlplane/apiserver.go | 41 +- pkg/internal/testing/controlplane/etcd.go | 69 ++- .../testing/process/arguments_test.go | 95 ---- pkg/internal/testing/process/process.go | 237 +++++----- ...al_suite_test.go => process_suite_test.go} | 0 pkg/internal/testing/process/process_test.go | 409 +++++++++--------- 6 files changed, 378 insertions(+), 473 deletions(-) delete mode 100644 pkg/internal/testing/process/arguments_test.go rename pkg/internal/testing/process/{internal_suite_test.go => process_suite_test.go} (100%) diff --git a/pkg/internal/testing/controlplane/apiserver.go b/pkg/internal/testing/controlplane/apiserver.go index a6ea01cb61..eae05eaa6a 100644 --- a/pkg/internal/testing/controlplane/apiserver.go +++ b/pkg/internal/testing/controlplane/apiserver.go @@ -4,9 +4,11 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/url" "os" "path/filepath" + "strconv" "time" "sigs.k8s.io/controller-runtime/pkg/internal/testing/addr" @@ -69,38 +71,34 @@ type APIServer struct { Out io.Writer Err io.Writer - processState *process.ProcessState + processState *process.State } // Start starts the apiserver, waits for it to come up, and returns an error, // if occurred. func (s *APIServer) Start() error { if s.processState == nil { - if err := s.setProcessState(); err != nil { + if err := s.setState(); err != nil { return err } } return s.processState.Start(s.Out, s.Err) } -func (s *APIServer) setProcessState() error { +func (s *APIServer) setState() error { if s.EtcdURL == nil { return fmt.Errorf("expected EtcdURL to be configured") } var err error - s.processState = &process.ProcessState{} - - s.processState.DefaultedProcessInput, err = process.DoDefaulting( - "kube-apiserver", - s.URL, - s.CertDir, - s.Path, - s.StartTimeout, - s.StopTimeout, - ) - if err != nil { + s.processState = &process.State{ + Dir: s.CertDir, + Path: s.Path, + StartTimeout: s.StartTimeout, + StopTimeout: s.StopTimeout, + } + if err := s.processState.Init("kube-apiserver"); err != nil { return err } @@ -112,9 +110,20 @@ func (s *APIServer) setProcessState() error { } } - s.processState.HealthCheckEndpoint = "/healthz" + if s.URL == nil { + port, host, err := addr.Suggest("") + if err != nil { + return err + } + s.URL = &url.URL{ + Scheme: "http", + Host: net.JoinHostPort(host, strconv.Itoa(port)), + } + } + + s.processState.HealthCheck.URL = *s.URL + s.processState.HealthCheck.Path = "/healthz" - s.URL = &s.processState.URL s.CertDir = s.processState.Dir s.Path = s.processState.Path s.StartTimeout = s.processState.StartTimeout diff --git a/pkg/internal/testing/controlplane/etcd.go b/pkg/internal/testing/controlplane/etcd.go index 8c906d4597..fde45e20ed 100644 --- a/pkg/internal/testing/controlplane/etcd.go +++ b/pkg/internal/testing/controlplane/etcd.go @@ -2,10 +2,12 @@ package controlplane import ( "io" - "time" - + "net" "net/url" + "strconv" + "time" + "sigs.k8s.io/controller-runtime/pkg/internal/testing/addr" "sigs.k8s.io/controller-runtime/pkg/internal/testing/process" ) @@ -55,40 +57,50 @@ type Etcd struct { Out io.Writer Err io.Writer - processState *process.ProcessState + // processState contains the actual details about this running process + processState *process.State } // Start starts the etcd, waits for it to come up, and returns an error, if one // occoured. func (e *Etcd) Start() error { if e.processState == nil { - if err := e.setProcessState(); err != nil { + if err := e.setState(); err != nil { return err } } return e.processState.Start(e.Out, e.Err) } -func (e *Etcd) setProcessState() error { +func (e *Etcd) setState() error { var err error - e.processState = &process.ProcessState{} - - e.processState.DefaultedProcessInput, err = process.DoDefaulting( - "etcd", - e.URL, - e.DataDir, - e.Path, - e.StartTimeout, - e.StopTimeout, - ) - if err != nil { + e.processState = &process.State{ + Dir: e.DataDir, + Path: e.Path, + StartTimeout: e.StartTimeout, + StopTimeout: e.StopTimeout, + } + + if err := e.processState.Init("etcd"); err != nil { return err } - e.processState.StartMessage = getEtcdStartMessage(e.processState.URL) + if e.URL == nil { + port, host, err := addr.Suggest("") + if err != nil { + return err + } + e.URL = &url.URL{ + Scheme: "http", + Host: net.JoinHostPort(host, strconv.Itoa(port)), + } + } + + // can use /health as of etcd 3.3.0 + e.processState.HealthCheck.URL = *e.URL + e.processState.HealthCheck.Path = "/health" - e.URL = &e.processState.URL e.DataDir = e.processState.Dir e.Path = e.processState.Path e.StartTimeout = e.processState.StartTimeout @@ -117,24 +129,3 @@ var EtcdDefaultArgs = []string{ "--listen-client-urls={{ if .URL }}{{ .URL.String }}{{ end }}", "--data-dir={{ .DataDir }}", } - -// isSecureScheme returns false when the schema is insecure. -func isSecureScheme(scheme string) bool { - // https://github.com/coreos/etcd/blob/d9deeff49a080a88c982d328ad9d33f26d1ad7b6/pkg/transport/listener.go#L53 - if scheme == "https" || scheme == "unixs" { - return true - } - return false -} - -// getEtcdStartMessage returns an start message to inform if the client is or not insecure. -// It will return true when the URL informed has the scheme == "https" || scheme == "unixs" -func getEtcdStartMessage(listenURL url.URL) string { - if isSecureScheme(listenURL.Scheme) { - // https://github.com/coreos/etcd/blob/a7f1fbe00ec216fcb3a1919397a103b41dca8413/embed/serve.go#L167 - return "serving client requests on " - } - - // https://github.com/coreos/etcd/blob/a7f1fbe00ec216fcb3a1919397a103b41dca8413/embed/serve.go#L124 - return "serving insecure client requests on " -} diff --git a/pkg/internal/testing/process/arguments_test.go b/pkg/internal/testing/process/arguments_test.go deleted file mode 100644 index 073c063a97..0000000000 --- a/pkg/internal/testing/process/arguments_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package process_test - -import ( - "net/url" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "sigs.k8s.io/controller-runtime/pkg/internal/testing/integration" - . "sigs.k8s.io/controller-runtime/pkg/internal/testing/process" -) - -var _ = Describe("Arguments", func() { - It("templates URLs", func() { - templates := []string{ - "plain URL: {{ .SomeURL }}", - "method on URL: {{ .SomeURL.Hostname }}", - "empty URL: {{ .EmptyURL }}", - "handled empty URL: {{- if .EmptyURL }}{{ .EmptyURL }}{{ end }}", - } - data := struct { - SomeURL *url.URL - EmptyURL *url.URL - }{ - &url.URL{Scheme: "https", Host: "the.host.name:3456"}, - nil, - } - - out, err := RenderTemplates(templates, data) - Expect(err).NotTo(HaveOccurred()) - Expect(out).To(BeEquivalentTo([]string{ - "plain URL: https://the.host.name:3456", - "method on URL: the.host.name", - "empty URL: <nil>", - "handled empty URL:", - })) - }) - - It("templates strings", func() { - templates := []string{ - "a string: {{ .SomeString }}", - "empty string: {{- .EmptyString }}", - } - data := struct { - SomeString string - EmptyString string - }{ - "this is some random string", - "", - } - - out, err := RenderTemplates(templates, data) - Expect(err).NotTo(HaveOccurred()) - Expect(out).To(BeEquivalentTo([]string{ - "a string: this is some random string", - "empty string:", - })) - }) - - It("has no access to unexported fields", func() { - templates := []string{ - "this is just a string", - "this blows up {{ .test }}", - } - data := struct{ test string }{"ooops private"} - - out, err := RenderTemplates(templates, data) - Expect(out).To(BeEmpty()) - Expect(err).To(MatchError( - ContainSubstring("is an unexported field of struct"), - )) - }) - - It("errors when field cannot be found", func() { - templates := []string{"this does {{ .NotExist }}"} - data := struct{ Unused string }{"unused"} - - out, err := RenderTemplates(templates, data) - Expect(out).To(BeEmpty()) - Expect(err).To(MatchError( - ContainSubstring("can't evaluate field"), - )) - }) - - Context("When overriding external default args", func() { - It("does not change the internal default args for APIServer", func() { - integration.APIServerDefaultArgs[0] = "oh no!" - Expect(APIServerDefaultArgs).NotTo(BeEquivalentTo(integration.APIServerDefaultArgs)) - }) - It("does not change the internal default args for Etcd", func() { - integration.EtcdDefaultArgs[0] = "oh no!" - Expect(EtcdDefaultArgs).NotTo(BeEquivalentTo(integration.EtcdDefaultArgs)) - }) - }) -}) diff --git a/pkg/internal/testing/process/process.go b/pkg/internal/testing/process/process.go index ac7fa786ae..fcbbe1f903 100644 --- a/pkg/internal/testing/process/process.go +++ b/pkg/internal/testing/process/process.go @@ -1,6 +1,7 @@ package process import ( + "crypto/tls" "fmt" "io" "io/ioutil" @@ -10,172 +11,190 @@ import ( "os" "os/exec" "path" - "strconv" + "sync" + "syscall" "time" +) - "github.com/onsi/gomega/gbytes" - "github.com/onsi/gomega/gexec" +// ListenAddr represents some listening address and port +type ListenAddr struct { + Address string + Port string +} - "sigs.k8s.io/controller-runtime/pkg/internal/testing/addr" -) +// URL returns a URL for this address with the given scheme and subpath +func (l *ListenAddr) URL(scheme string, path string) *url.URL { + return &url.URL{ + Scheme: scheme, + Host: l.HostPort(), + Path: path, + } +} + +// HostPort returns the joined host-port pair for this address +func (l *ListenAddr) HostPort() string { + return net.JoinHostPort(l.Address, l.Port) +} + +// HealthCheck describes the information needed to health-check a process via +// some health-check URL. +type HealthCheck struct { + url.URL -// ProcessState define the state of the process. -type ProcessState struct { - DefaultedProcessInput - Session *gexec.Session - // Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we - // assume the process is ready to operate. E.g. "/healthz". If this is set, - // we ignore StartMessage. - HealthCheckEndpoint string // HealthCheckPollInterval is the interval which will be used for polling the - // HealthCheckEndpoint. - // If left empty it will default to 100 Milliseconds. - HealthCheckPollInterval time.Duration - // StartMessage is the message to wait for on stderr. If we receive this - // message, we assume the process is ready to operate. Ignored if - // HealthCheckEndpoint is specified. + // endpoint described by Host, Port, and Path. // - // The usage of StartMessage is discouraged, favour HealthCheckEndpoint - // instead! + // If left empty it will default to 100 Milliseconds. + PollInterval time.Duration +} + +// State define the state of the process. +type State struct { + Cmd *exec.Cmd + + // HealthCheck describes how to check if this process is up. If we get an http.StatusOK, + // we assume the process is ready to operate. // - // Deprecated: Use HealthCheckEndpoint in favour of StartMessage - StartMessage string - Args []string + // For example, the /healthz endpoint of the k8s API server, or the /health endpoint of etcd. + HealthCheck HealthCheck - // ready holds wether the process is currently in ready state (hit the ready condition) or not. - // It will be set to true on a successful `Start()` and set to false on a successful `Stop()` - ready bool -} + Args []string + + StopTimeout time.Duration + StartTimeout time.Duration -// DefaultedProcessInput defines the default process input required to perform the test. -type DefaultedProcessInput struct { - URL url.URL Dir string DirNeedsCleaning bool Path string - StopTimeout time.Duration - StartTimeout time.Duration -} -// DoDefaulting sets the default configuration according to the data informed and return an DefaultedProcessInput -// and an error if some requirement was not informed. -func DoDefaulting( - name string, - listenURL *url.URL, - dir string, - path string, - startTimeout time.Duration, - stopTimeout time.Duration, -) (DefaultedProcessInput, error) { - defaults := DefaultedProcessInput{ - Dir: dir, - Path: path, - StartTimeout: startTimeout, - StopTimeout: stopTimeout, - } - - if listenURL == nil { - port, host, err := addr.Suggest("") - if err != nil { - return DefaultedProcessInput{}, err - } - defaults.URL = url.URL{ - Scheme: "http", - Host: net.JoinHostPort(host, strconv.Itoa(port)), - } - } else { - defaults.URL = *listenURL - } + // ready holds wether the process is currently in ready state (hit the ready condition) or not. + // It will be set to true on a successful `Start()` and set to false on a successful `Stop()` + ready bool + + // waitDone is closed when our call to wait finishes up, and indicates that + // our process has terminated. + waitDone chan struct{} + errMu sync.Mutex + exitErr error + exited bool +} - if path == "" { +// Init sets up this process, configuring binary paths if missing, initializing +// temporary directories, etc. +// +// This defaults all defaultable fields. +func (ps *State) Init(name string) error { + if ps.Path == "" { if name == "" { - return DefaultedProcessInput{}, fmt.Errorf("must have at least one of name or path") + return fmt.Errorf("must have at least one of name or path") } - defaults.Path = BinPathFinder(name) + ps.Path = BinPathFinder(name) } - if dir == "" { + if ps.Dir == "" { newDir, err := ioutil.TempDir("", "k8s_test_framework_") if err != nil { - return DefaultedProcessInput{}, err + return err } - defaults.Dir = newDir - defaults.DirNeedsCleaning = true + ps.Dir = newDir + ps.DirNeedsCleaning = true } - if startTimeout == 0 { - defaults.StartTimeout = 20 * time.Second + if ps.StartTimeout == 0 { + ps.StartTimeout = 20 * time.Second } - if stopTimeout == 0 { - defaults.StopTimeout = 20 * time.Second + if ps.StopTimeout == 0 { + ps.StopTimeout = 20 * time.Second } - - return defaults, nil + return nil } type stopChannel chan struct{} // Start starts the apiserver, waits for it to come up, and returns an error, // if occurred. -func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) { +func (ps *State) Start(stdout, stderr io.Writer) (err error) { if ps.ready { return nil } - command := exec.Command(ps.Path, ps.Args...) + ps.Cmd = exec.Command(ps.Path, ps.Args...) + ps.Cmd.Stdout = stdout + ps.Cmd.Stderr = stderr ready := make(chan bool) timedOut := time.After(ps.StartTimeout) var pollerStopCh stopChannel + pollerStopCh = make(stopChannel) + go pollURLUntilOK(ps.HealthCheck.URL, ps.HealthCheck.PollInterval, ready, pollerStopCh) - if ps.HealthCheckEndpoint != "" { - healthCheckURL := ps.URL - healthCheckURL.Path = ps.HealthCheckEndpoint - pollerStopCh = make(stopChannel) - go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh) - } else { - startDetectStream := gbytes.NewBuffer() - ready = startDetectStream.Detect(ps.StartMessage) - stderr = safeMultiWriter(stderr, startDetectStream) - } + ps.waitDone = make(chan struct{}) - ps.Session, err = gexec.Start(command, stdout, stderr) - if err != nil { + if err := ps.Cmd.Start(); err != nil { + ps.errMu.Lock() + defer ps.errMu.Unlock() + ps.exited = true return err } + go func() { + defer close(ps.waitDone) + err := ps.Cmd.Wait() + + ps.errMu.Lock() + defer ps.errMu.Unlock() + ps.exitErr = err + ps.exited = true + }() select { case <-ready: ps.ready = true return nil + case <-ps.waitDone: + if pollerStopCh != nil { + close(pollerStopCh) + } + return fmt.Errorf("timeout waiting for process %s to start successfully "+ + "(it may have failed to start, or stopped unexpectedly before becoming ready)", + path.Base(ps.Path)) case <-timedOut: if pollerStopCh != nil { close(pollerStopCh) } - if ps.Session != nil { - ps.Session.Terminate() + if ps.Cmd != nil { + // intentionally ignore this -- we might've crashed, failed to start, etc + ps.Cmd.Process.Signal(syscall.SIGTERM) //nolint errcheck } return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path)) } } -func safeMultiWriter(writers ...io.Writer) io.Writer { - safeWriters := []io.Writer{} - for _, w := range writers { - if w != nil { - safeWriters = append(safeWriters, w) - } - } - return io.MultiWriter(safeWriters...) +// Exited returns true if the process exited, and may also +// return an error (as per Cmd.Wait) if the process did not +// exit with error code 0. +func (ps *State) Exited() (bool, error) { + ps.errMu.Lock() + defer ps.errMu.Unlock() + return ps.exited, ps.exitErr } func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) { + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + // there's probably certs *somewhere*, + // but it's fine to just skip validating + // them for health checks during testing + InsecureSkipVerify: true, + }, + }, + } if interval <= 0 { interval = 100 * time.Millisecond } for { - res, err := http.Get(url.String()) + res, err := client.Get(url.String()) if err == nil { res.Body.Close() if res.StatusCode == http.StatusOK { @@ -195,23 +214,21 @@ func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh // Stop stops this process gracefully, waits for its termination, and cleans up // the CertDir if necessary. -func (ps *ProcessState) Stop() error { - if ps.Session == nil { +func (ps *State) Stop() error { + if ps.Cmd == nil { return nil } - - // gexec's Session methods (Signal, Kill, ...) do not check if the Process is - // nil, so we are doing this here for now. - // This should probably be fixed in gexec. - if ps.Session.Command.Process == nil { + if done, _ := ps.Exited(); done { return nil } + if err := ps.Cmd.Process.Signal(syscall.SIGTERM); err != nil { + return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err) + } - detectedStop := ps.Session.Terminate().Exited timedOut := time.After(ps.StopTimeout) select { - case <-detectedStop: + case <-ps.waitDone: break case <-timedOut: return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path)) diff --git a/pkg/internal/testing/process/internal_suite_test.go b/pkg/internal/testing/process/process_suite_test.go similarity index 100% rename from pkg/internal/testing/process/internal_suite_test.go rename to pkg/internal/testing/process/process_suite_test.go diff --git a/pkg/internal/testing/process/process_test.go b/pkg/internal/testing/process/process_test.go index dd933291db..a486cff554 100644 --- a/pkg/internal/testing/process/process_test.go +++ b/pkg/internal/testing/process/process_test.go @@ -7,13 +7,11 @@ import ( "net/http" "net/url" "os" - "os/exec" "strconv" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/onsi/gomega/gexec" "github.com/onsi/gomega/ghttp" "sigs.k8s.io/controller-runtime/pkg/internal/testing/addr" . "sigs.k8s.io/controller-runtime/pkg/internal/testing/process" @@ -25,159 +23,73 @@ const ( var _ = Describe("Start method", func() { var ( - processState *ProcessState + processState *State + server *ghttp.Server ) BeforeEach(func() { - processState = &ProcessState{} + server = ghttp.NewServer() + + processState = &State{ + Path: "bash", + Args: simpleBashScript, + HealthCheck: HealthCheck{ + URL: getServerURL(server), + }, + } processState.Path = "bash" processState.Args = simpleBashScript - }) - - It("can start a process", func() { - processState.StartTimeout = 10 * time.Second - processState.StartMessage = "loop 5" - - err := processState.Start(nil, nil) - Expect(err).NotTo(HaveOccurred()) - Consistently(processState.Session.ExitCode).Should(BeNumerically("==", -1)) + }) + AfterEach(func() { + server.Close() }) - Context("when a health check endpoint is provided", func() { - var server *ghttp.Server + Context("when process takes too long to start", func() { BeforeEach(func() { - server = ghttp.NewServer() - }) - AfterEach(func() { - server.Close() - }) - - Context("when the healthcheck returns ok", func() { - BeforeEach(func() { - server.RouteToHandler("GET", healthURLPath, ghttp.RespondWith(http.StatusOK, "")) - }) - - It("hits the endpoint, and successfully starts", func() { - processState.HealthCheckEndpoint = healthURLPath - processState.StartTimeout = 100 * time.Millisecond - processState.URL = getServerURL(server) - - err := processState.Start(nil, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(server.ReceivedRequests()).To(HaveLen(1)) - Consistently(processState.Session.ExitCode).Should(BeNumerically("==", -1)) + server.RouteToHandler("GET", healthURLPath, func(resp http.ResponseWriter, _ *http.Request) { + time.Sleep(250 * time.Millisecond) + resp.WriteHeader(http.StatusOK) }) }) + It("returns a timeout error", func() { + processState.StartTimeout = 200 * time.Millisecond - Context("when the healthcheck always returns failure", func() { - BeforeEach(func() { - server.RouteToHandler("GET", healthURLPath, ghttp.RespondWith(http.StatusInternalServerError, "")) - }) - It("returns a timeout error and stops health API checker", func() { - processState.HealthCheckEndpoint = healthURLPath - processState.StartTimeout = 500 * time.Millisecond - processState.URL = getServerURL(server) - - err := processState.Start(nil, nil) - Expect(err).To(MatchError(ContainSubstring("timeout"))) - - nrReceivedRequests := len(server.ReceivedRequests()) - Expect(nrReceivedRequests).To(Equal(5)) - time.Sleep(200 * time.Millisecond) - Expect(nrReceivedRequests).To(Equal(5)) - }) - }) - - Context("when the healthcheck isn't even listening", func() { - BeforeEach(func() { - server.Close() - }) - - It("returns a timeout error", func() { - processState.HealthCheckEndpoint = healthURLPath - processState.StartTimeout = 500 * time.Millisecond - - port, host, err := addr.Suggest("") - Expect(err).NotTo(HaveOccurred()) - - processState.URL = url.URL{ - Scheme: "http", - Host: net.JoinHostPort(host, strconv.Itoa(port)), - } + err := processState.Start(nil, nil) + Expect(err).To(MatchError(ContainSubstring("timeout"))) - err = processState.Start(nil, nil) - Expect(err).To(MatchError(ContainSubstring("timeout"))) - }) + Eventually(func() bool { done, _ := processState.Exited(); return done }).Should(BeTrue()) }) + }) - Context("when the healthcheck fails initially but succeeds eventually", func() { - BeforeEach(func() { - server.AppendHandlers( - ghttp.RespondWith(http.StatusInternalServerError, ""), - ghttp.RespondWith(http.StatusInternalServerError, ""), - ghttp.RespondWith(http.StatusInternalServerError, ""), - ghttp.RespondWith(http.StatusOK, ""), - ) - }) - - It("hits the endpoint repeatedly, and successfully starts", func() { - processState.HealthCheckEndpoint = healthURLPath - processState.StartTimeout = 20 * time.Second - processState.URL = getServerURL(server) - - err := processState.Start(nil, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(server.ReceivedRequests()).To(HaveLen(4)) - Consistently(processState.Session.ExitCode).Should(BeNumerically("==", -1)) - }) - - Context("when the polling interval is not configured", func() { - It("uses the default interval for polling", func() { - processState.HealthCheckEndpoint = "/helathz" - processState.StartTimeout = 300 * time.Millisecond - processState.URL = getServerURL(server) + Context("when the healthcheck returns ok", func() { + BeforeEach(func() { - Expect(processState.Start(nil, nil)).To(MatchError(ContainSubstring("timeout"))) - Expect(server.ReceivedRequests()).To(HaveLen(3)) - }) - }) + server.RouteToHandler("GET", healthURLPath, ghttp.RespondWith(http.StatusOK, "")) + }) - Context("when the polling interval is configured", func() { - BeforeEach(func() { - processState.HealthCheckPollInterval = time.Millisecond * 150 - }) + It("can start a process", func() { + processState.StartTimeout = 10 * time.Second - It("hits the endpoint in the configured interval", func() { - processState.HealthCheckEndpoint = healthURLPath - processState.StartTimeout = 3 * processState.HealthCheckPollInterval - processState.URL = getServerURL(server) + err := processState.Start(nil, nil) + Expect(err).NotTo(HaveOccurred()) - Expect(processState.Start(nil, nil)).To(MatchError(ContainSubstring("timeout"))) - Expect(server.ReceivedRequests()).To(HaveLen(3)) - }) - }) + Consistently(processState.Exited).Should(BeFalse()) }) - }) - - Context("when a health check endpoint is not provided", func() { - - Context("when process takes too long to start", func() { - It("returns a timeout error", func() { - processState.StartTimeout = 200 * time.Millisecond - processState.StartMessage = "loop 5000" - err := processState.Start(nil, nil) - Expect(err).To(MatchError(ContainSubstring("timeout"))) + It("hits the endpoint, and successfully starts", func() { + processState.StartTimeout = 100 * time.Millisecond - Eventually(processState.Session.ExitCode).Should(Equal(143)) - }) + err := processState.Start(nil, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(server.ReceivedRequests()).To(HaveLen(1)) + Consistently(processState.Exited).Should(BeFalse()) }) Context("when the command cannot be started", func() { var err error BeforeEach(func() { - processState = &ProcessState{} + processState = &State{} processState.Path = "/nonexistent" err = processState.Start(nil, nil) @@ -197,43 +109,145 @@ var _ = Describe("Start method", func() { }) }) }) + + Context("when IO is configured", func() { + It("can inspect stdout & stderr", func() { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + + processState.Args = []string{ + "-c", + ` + echo 'this is stderr' >&2 + echo 'that is stdout' + echo 'i started' >&2 + `, + } + processState.StartTimeout = 1 * time.Second + + Expect(processState.Start(stdout, stderr)).To(Succeed()) + Eventually(processState.Exited).Should(BeTrue()) + + Expect(stdout.String()).To(Equal("that is stdout\n")) + Expect(stderr.String()).To(Equal("this is stderr\ni started\n")) + }) + }) + }) + + Context("when the healthcheck always returns failure", func() { + BeforeEach(func() { + server.RouteToHandler("GET", healthURLPath, ghttp.RespondWith(http.StatusInternalServerError, "")) + }) + It("returns a timeout error and stops health API checker", func() { + processState.HealthCheck.URL = getServerURL(server) + processState.HealthCheck.Path = healthURLPath + processState.StartTimeout = 500 * time.Millisecond + + err := processState.Start(nil, nil) + Expect(err).To(MatchError(ContainSubstring("timeout"))) + + nrReceivedRequests := len(server.ReceivedRequests()) + Expect(nrReceivedRequests).To(Equal(5)) + time.Sleep(200 * time.Millisecond) + Expect(nrReceivedRequests).To(Equal(5)) + }) }) - Context("when IO is configured", func() { - It("can inspect stdout & stderr", func() { - stdout := &bytes.Buffer{} - stderr := &bytes.Buffer{} - - processState.Args = []string{ - "-c", - ` - echo 'this is stderr' >&2 - echo 'that is stdout' - echo 'i started' >&2 - `, + Context("when the healthcheck isn't even listening", func() { + BeforeEach(func() { + server.Close() + }) + + It("returns a timeout error", func() { + processState.HealthCheck.Path = healthURLPath + processState.StartTimeout = 500 * time.Millisecond + + port, host, err := addr.Suggest("") + Expect(err).NotTo(HaveOccurred()) + + processState.HealthCheck.URL = url.URL{ + Scheme: "http", + Host: net.JoinHostPort(host, strconv.Itoa(port)), } - processState.StartMessage = "i started" - processState.StartTimeout = 1 * time.Second - Expect(processState.Start(stdout, stderr)).To(Succeed()) - Eventually(processState.Session).Should(gexec.Exit()) + err = processState.Start(nil, nil) + Expect(err).To(MatchError(ContainSubstring("timeout"))) + }) + }) + + Context("when the healthcheck fails initially but succeeds eventually", func() { + BeforeEach(func() { + server.AppendHandlers( + ghttp.RespondWith(http.StatusInternalServerError, ""), + ghttp.RespondWith(http.StatusInternalServerError, ""), + ghttp.RespondWith(http.StatusInternalServerError, ""), + ghttp.RespondWith(http.StatusOK, ""), + ) + }) + + It("hits the endpoint repeatedly, and successfully starts", func() { + processState.HealthCheck.URL = getServerURL(server) + processState.HealthCheck.Path = healthURLPath + processState.StartTimeout = 20 * time.Second + + err := processState.Start(nil, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(server.ReceivedRequests()).To(HaveLen(4)) + Consistently(processState.Exited).Should(BeFalse()) + }) + + Context("when the polling interval is not configured", func() { + It("uses the default interval for polling", func() { + processState.HealthCheck.URL = getServerURL(server) + processState.HealthCheck.Path = "/helathz" + processState.StartTimeout = 300 * time.Millisecond + + Expect(processState.Start(nil, nil)).To(MatchError(ContainSubstring("timeout"))) + Expect(server.ReceivedRequests()).To(HaveLen(3)) + }) + }) + + Context("when the polling interval is configured", func() { + BeforeEach(func() { + processState.HealthCheck.URL = getServerURL(server) + processState.HealthCheck.Path = healthURLPath + processState.HealthCheck.PollInterval = time.Millisecond * 150 + }) - Expect(stdout.String()).To(Equal("that is stdout\n")) - Expect(stderr.String()).To(Equal("this is stderr\ni started\n")) + It("hits the endpoint in the configured interval", func() { + processState.StartTimeout = 3 * processState.HealthCheck.PollInterval + + Expect(processState.Start(nil, nil)).To(MatchError(ContainSubstring("timeout"))) + Expect(server.ReceivedRequests()).To(HaveLen(3)) + }) }) }) }) var _ = Describe("Stop method", func() { + var ( + server *ghttp.Server + processState *State + ) + BeforeEach(func() { + server = ghttp.NewServer() + server.RouteToHandler("GET", healthURLPath, ghttp.RespondWith(http.StatusOK, "")) + processState = &State{ + Path: "bash", + Args: simpleBashScript, + HealthCheck: HealthCheck{ + URL: getServerURL(server), + }, + } + processState.StartTimeout = 10 * time.Second + }) + + AfterEach(func() { + server.Close() + }) Context("when Stop() is called", func() { - var ( - processState *ProcessState - ) BeforeEach(func() { - var err error - processState = &ProcessState{} - processState.Session, err = gexec.Start(getSimpleCommand(), nil, nil) - Expect(err).NotTo(HaveOccurred()) + Expect(processState.Start(nil, nil)).To(Succeed()) processState.StopTimeout = 10 * time.Second }) @@ -255,13 +269,8 @@ var _ = Describe("Stop method", func() { Context("when the command cannot be stopped", func() { It("returns a timeout error", func() { - var err error - - processState := &ProcessState{} - processState.Session, err = gexec.Start(getSimpleCommand(), nil, nil) - Expect(err).NotTo(HaveOccurred()) - processState.Session.Exited = make(chan struct{}) - processState.StopTimeout = 200 * time.Millisecond + Expect(processState.Start(nil, nil)).To(Succeed()) + processState.StopTimeout = 1 * time.Nanosecond // much shorter than the sleep in the script Expect(processState.Stop()).To(MatchError(ContainSubstring("timeout"))) }) @@ -271,9 +280,7 @@ var _ = Describe("Stop method", func() { It("removes the directory", func() { var err error - processState := &ProcessState{} - processState.Session, err = gexec.Start(getSimpleCommand(), nil, nil) - Expect(err).NotTo(HaveOccurred()) + Expect(processState.Start(nil, nil)).To(Succeed()) processState.Dir, err = ioutil.TempDir("", "k8s_test_framework_") Expect(err).NotTo(HaveOccurred()) processState.DirNeedsCleaning = true @@ -285,67 +292,46 @@ var _ = Describe("Stop method", func() { }) }) -var _ = Describe("DoDefaulting", func() { +var _ = Describe("Init", func() { Context("when all inputs are provided", func() { It("passes them through", func() { - defaults, err := DoDefaulting( - "some name", - &url.URL{Host: "some.host.to.listen.on"}, - "/some/dir", - "/some/path/to/some/bin", - 20*time.Hour, - 65537*time.Millisecond, - ) - Expect(err).NotTo(HaveOccurred()) + ps := &State{ + Dir: "/some/dir", + Path: "/some/path/to/some/bin", + StartTimeout: 20 * time.Hour, + StopTimeout: 65537 * time.Millisecond, + } + + Expect(ps.Init("some name")).To(Succeed()) - Expect(defaults.URL).To(Equal(url.URL{Host: "some.host.to.listen.on"})) - Expect(defaults.Dir).To(Equal("/some/dir")) - Expect(defaults.DirNeedsCleaning).To(BeFalse()) - Expect(defaults.Path).To(Equal("/some/path/to/some/bin")) - Expect(defaults.StartTimeout).To(Equal(20 * time.Hour)) - Expect(defaults.StopTimeout).To(Equal(65537 * time.Millisecond)) + Expect(ps.Dir).To(Equal("/some/dir")) + Expect(ps.DirNeedsCleaning).To(BeFalse()) + Expect(ps.Path).To(Equal("/some/path/to/some/bin")) + Expect(ps.StartTimeout).To(Equal(20 * time.Hour)) + Expect(ps.StopTimeout).To(Equal(65537 * time.Millisecond)) }) }) Context("when inputs are empty", func() { - It("defaults them", func() { - defaults, err := DoDefaulting( - "some name", - nil, - "", - "", - 0, - 0, - ) - Expect(err).NotTo(HaveOccurred()) - - Expect(defaults.Dir).To(BeADirectory()) - Expect(os.RemoveAll(defaults.Dir)).To(Succeed()) - Expect(defaults.DirNeedsCleaning).To(BeTrue()) + It("ps them", func() { + ps := &State{} + Expect(ps.Init("some name")).To(Succeed()) - Expect(defaults.URL).NotTo(BeZero()) - Expect(defaults.URL.Scheme).To(Equal("http")) - Expect(defaults.URL.Hostname()).NotTo(BeEmpty()) - Expect(defaults.URL.Port()).NotTo(BeEmpty()) + Expect(ps.Dir).To(BeADirectory()) + Expect(os.RemoveAll(ps.Dir)).To(Succeed()) + Expect(ps.DirNeedsCleaning).To(BeTrue()) - Expect(defaults.Path).NotTo(BeEmpty()) + Expect(ps.Path).NotTo(BeEmpty()) - Expect(defaults.StartTimeout).NotTo(BeZero()) - Expect(defaults.StopTimeout).NotTo(BeZero()) + Expect(ps.StartTimeout).NotTo(BeZero()) + Expect(ps.StopTimeout).NotTo(BeZero()) }) }) Context("when neither name nor path are provided", func() { It("returns an error", func() { - _, err := DoDefaulting( - "", - nil, - "", - "", - 0, - 0, - ) - Expect(err).To(MatchError("must have at least one of name or path")) + ps := &State{} + Expect(ps.Init("")).To(MatchError("must have at least one of name or path")) }) }) }) @@ -363,12 +349,9 @@ var simpleBashScript = []string{ `, } -func getSimpleCommand() *exec.Cmd { - return exec.Command("bash", simpleBashScript...) -} - func getServerURL(server *ghttp.Server) url.URL { url, err := url.Parse(server.URL()) Expect(err).NotTo(HaveOccurred()) + url.Path = healthURLPath return *url }