From 0b62224f15307d432910ce60db4d1ac14d0e8127 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Wed, 24 Jul 2024 09:51:25 +0100 Subject: [PATCH] fix(reaper): refactor to allow retries and fix races Refactor how the reaper is created to allow for proper retries of temporary errors such as container not found issues during startup or shutdown races. This eliminates the use of sync.Once which wasn't solving the problem at hand and replaces it with a singleton spawner with locked access. Wrap reaper errors so we can determine the cause of failures more easily. Fix race condition in port wait when loading from container by always waiting for the port first. Remove unnecessary use of buffering and invalid retry logic in reaper connection handling which could never recover correctly from a partial read. Move the reaper creation just before connections are established in compose to ensure its still running when the Connect calls are made. Previously the reaper was requested in NewDockerComposeWith which means it could have already shutdown before connections are made during the later sections of Up if the startup took over 1 minute. This was causing consistent failures for: TestDockerComposeAPIWithWaitLogStrategy Ensure that resource labels are correct so that resources aren't reaped when the reaper is disabled by excluding session id when reaper is disabled. Error when creating a reaper when the config says it's disabled so that we avoid hard to debug issues because a reaper is running when it shouldn't be. --- container.go | 2 +- docker.go | 102 +++--- docker_mounts.go | 4 +- generic.go | 12 +- internal/core/labels.go | 25 +- modules/compose/compose.go | 19 -- modules/compose/compose_api.go | 110 ++++--- reaper.go | 564 +++++++++++++++++++++------------ reaper_test.go | 240 +++++++------- 9 files changed, 644 insertions(+), 434 deletions(-) diff --git a/container.go b/container.go index 6f17a7e3ff7..9b0ca2d7a9e 100644 --- a/container.go +++ b/container.go @@ -395,7 +395,7 @@ func (c *ContainerRequest) BuildOptions() (types.ImageBuildOptions, error) { } if !c.ShouldKeepBuiltImage() { - buildOptions.Labels = core.DefaultLabels(core.SessionID()) + buildOptions.Labels = GenericLabels() } // Do this as late as possible to ensure we don't leak the context on error/panic. diff --git a/docker.go b/docker.go index 6bd80c39c42..532460a5623 100644 --- a/docker.go +++ b/docker.go @@ -178,7 +178,7 @@ func (c *DockerContainer) Inspect(ctx context.Context) (*types.ContainerJSON, er func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) { inspect, err := c.Inspect(ctx) if err != nil { - return "", err + return "", fmt.Errorf("inspect: %w", err) } if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" { return port, nil @@ -199,7 +199,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po return nat.NewPort(k.Proto(), p[0].HostPort) } - return "", errors.New("port not found") + return "", errdefs.NotFound(fmt.Errorf("port %q not found", port)) } // Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead. @@ -944,9 +944,7 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st } // CreateContainer fulfils a request for a container without starting it -func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) { - var err error - +func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error checking. // defer the close of the Docker client connection the soonest defer p.Close() @@ -991,22 +989,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque // the reaper does not need to start a reaper for itself isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage) if !p.config.RyukDisabled && !isReaperContainer { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p) if err != nil { - return nil, fmt.Errorf("%w: creating reaper failed", err) + return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } - } - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() + } if err = req.Validate(); err != nil { return nil, err @@ -1072,10 +1071,9 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } if !isReaperContainer { - // add the labels that the reaper will use to terminate the container to the request - for k, v := range core.DefaultLabels(core.SessionID()) { - req.Labels[k] = v - } + // Add the labels that identify this as a testcontainers container and + // allow the reaper to terminate it if requested. + AddGenericLabels(req.Labels) } dockerInput := &container.Config{ @@ -1169,9 +1167,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque return nil, err } - // Disable cleanup on success - termSignal = nil - return c, nil } @@ -1220,7 +1215,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string) ) } -func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) { +func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error check. c, err := p.findContainerByName(ctx, req.Name) if err != nil { return nil, err @@ -1243,14 +1238,22 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain var termSignal chan bool if !p.config.RyukDisabled { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) if err != nil { return nil, fmt.Errorf("%w: creating reaper failed", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } + + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() } // default hooks include logger hook and pre-create hook @@ -1418,9 +1421,7 @@ func daemonHost(ctx context.Context, p *DockerProvider) (string, error) { // Deprecated: use network.New instead // CreateNetwork returns the object representing a new network identified by its name -func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) { - var err error - +func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (net Network, err error) { //nolint:nonamedreturns // Needed for error check. // defer the close of the Docker client connection the soonest defer p.Close() @@ -1449,31 +1450,30 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) var termSignal chan bool if !p.config.RyukDisabled { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) if err != nil { - return nil, fmt.Errorf("%w: creating network reaper failed", err) + return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to network reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } - } - // add the labels that the reaper will use to terminate the network to the request - for k, v := range core.DefaultLabels(sessionID) { - req.Labels[k] = v + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() } - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + // add the labels that the reaper will use to terminate the network to the request + core.AddDefaultLabels(sessionID, req.Labels) response, err := p.client.NetworkCreate(ctx, req.Name, nc) if err != nil { - return &DockerNetwork{}, err + return &DockerNetwork{}, fmt.Errorf("create network: %w", err) } n := &DockerNetwork{ @@ -1484,9 +1484,6 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) provider: p, } - // Disable cleanup on success - termSignal = nil - return n, nil } @@ -1556,9 +1553,12 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl _, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{ Driver: Bridge, Attachable: true, - Labels: core.DefaultLabels(core.SessionID()), + Labels: GenericLabels(), }) - if err != nil { + // If the network already exists, we can ignore the error as that can + // happen if we are running multiple tests in parallel and we only + // need to ensure that the network exists. + if err != nil && !errdefs.IsConflict(err) { return "", err } } @@ -1596,7 +1596,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container) // populate the raw representation of the container jsonRaw, err := ctr.inspectRawContainer(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("inspect raw container: %w", err) } // the health status of the container, if any diff --git a/docker_mounts.go b/docker_mounts.go index aed30103618..d8af3fae3ef 100644 --- a/docker_mounts.go +++ b/docker_mounts.go @@ -126,9 +126,7 @@ func mapToDockerMounts(containerMounts ContainerMounts) []mount.Mount { Labels: make(map[string]string), } } - for k, v := range GenericLabels() { - containerMount.VolumeOptions.Labels[k] = v - } + AddGenericLabels(containerMount.VolumeOptions.Labels) } mounts = append(mounts, containerMount) diff --git a/generic.go b/generic.go index f0cda134075..debea75677e 100644 --- a/generic.go +++ b/generic.go @@ -92,7 +92,17 @@ type GenericProvider interface { ImageProvider } -// GenericLabels returns a map of labels that can be used to identify containers created by this library +// GenericLabels returns a map of labels that can be used to identify resources +// created by this library. This includes the standard LabelSessionID if the +// reaper is enabled, otherwise this is excluded to prevent resources being +// incorrectly reaped. func GenericLabels() map[string]string { return core.DefaultLabels(core.SessionID()) } + +// AddGenericLabels adds the generic labels to target. +func AddGenericLabels(target map[string]string) { + for k, v := range GenericLabels() { + target[k] = v + } +} diff --git a/internal/core/labels.go b/internal/core/labels.go index 58b054ab952..485770f99cb 100644 --- a/internal/core/labels.go +++ b/internal/core/labels.go @@ -2,6 +2,7 @@ package core import ( "github.com/testcontainers/testcontainers-go/internal" + "github.com/testcontainers/testcontainers-go/internal/config" ) const ( @@ -13,11 +14,25 @@ const ( LabelVersion = LabelBase + ".version" ) +// DefaultLabels returns the standard set of labels which +// includes LabelSessionID if the reaper is enabled. func DefaultLabels(sessionID string) map[string]string { - return map[string]string{ - LabelBase: "true", - LabelLang: "go", - LabelSessionID: sessionID, - LabelVersion: internal.Version, + labels := map[string]string{ + LabelBase: "true", + LabelLang: "go", + LabelVersion: internal.Version, + } + + if !config.Read().RyukDisabled { + labels[LabelSessionID] = sessionID + } + + return labels +} + +// AddDefaultLabels adds the default labels for sessionID to target. +func AddDefaultLabels(sessionID string, target map[string]string) { + for k, v := range DefaultLabels(sessionID) { + target[k] = v } } diff --git a/modules/compose/compose.go b/modules/compose/compose.go index 38adfafd2f5..44da068e9b3 100644 --- a/modules/compose/compose.go +++ b/modules/compose/compose.go @@ -3,7 +3,6 @@ package compose import ( "context" "errors" - "fmt" "io" "path/filepath" "runtime" @@ -146,23 +145,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) { return nil, err } - reaperProvider, err := testcontainers.NewDockerProvider() - if err != nil { - return nil, fmt.Errorf("failed to create reaper provider for compose: %w", err) - } - - var composeReaper *testcontainers.Reaper - if !reaperProvider.Config().Config.RyukDisabled { - // NewReaper is deprecated: we need to find a way to create the reaper for compose - // bypassing the deprecation. - r, err := testcontainers.NewReaper(context.Background(), testcontainers.SessionID(), reaperProvider, "") - if err != nil { - return nil, fmt.Errorf("failed to create reaper for compose: %w", err) - } - - composeReaper = r - } - composeAPI := &dockerCompose{ name: composeOptions.Identifier, configs: composeOptions.Paths, @@ -174,7 +156,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) { containers: make(map[string]*testcontainers.DockerContainer), networks: make(map[string]*testcontainers.DockerNetwork), sessionID: testcontainers.SessionID(), - reaper: composeReaper, } return composeAPI, nil diff --git a/modules/compose/compose_api.go b/modules/compose/compose_api.go index ba29c070b13..de1cd2eb866 100644 --- a/modules/compose/compose_api.go +++ b/modules/compose/compose_api.go @@ -2,6 +2,7 @@ package compose import ( "context" + "errors" "fmt" "io" "os" @@ -218,9 +219,6 @@ type dockerCompose struct { // sessionID is used to identify the reaper session sessionID string - - // reaper is used to clean up containers after the stack is stopped - reaper *testcontainers.Reaper } func (d *dockerCompose) ServiceContainer(ctx context.Context, svcName string) (*testcontainers.DockerContainer, error) { @@ -259,12 +257,10 @@ func (d *dockerCompose) Down(ctx context.Context, opts ...StackDownOption) error return d.composeService.Down(ctx, d.name, options.DownOptions) } -func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { +func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) (err error) { d.lock.Lock() defer d.lock.Unlock() - var err error - d.project, err = d.compileProject(ctx) if err != nil { return err @@ -319,27 +315,61 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { return err } - if d.reaper != nil { + provider, err := testcontainers.NewDockerProvider(testcontainers.WithLogger(d.logger)) + if err != nil { + return fmt.Errorf("new docker provider: %w", err) + } + + var termSignals []chan bool + var reaper *testcontainers.Reaper + if !provider.Config().Config.RyukDisabled { + // NewReaper is deprecated: we need to find a way to create the reaper for compose + // bypassing the deprecation. + reaper, err = testcontainers.NewReaper(ctx, testcontainers.SessionID(), provider, "") + if err != nil { + return fmt.Errorf("create reaper: %w", err) + } + + // Cleanup on error, otherwise set termSignal to nil before successful return. + defer func() { + if len(termSignals) == 0 { + // Need to call Connect at least once to ensure the initial + // connection is cleaned up. + termSignal, errc := reaper.Connect() + if errc != nil { + err = errors.Join(err, fmt.Errorf("reaper connect: %w", errc)) + } else { + termSignal <- true + } + } + + if err == nil { + // No need to cleanup. + return + } + + for _, ts := range termSignals { + ts <- true + } + }() + + // Connect to the reaper and set the termination signal for each network. for _, n := range d.networks { - termSignal, err := d.reaper.Connect() + termSignal, err := reaper.Connect() if err != nil { - return fmt.Errorf("failed to connect to reaper: %w", err) + return fmt.Errorf("reaper connect: %w", err) } - n.SetTerminationSignal(termSignal) - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + n.SetTerminationSignal(termSignal) + termSignals = append(termSignals, termSignal) } } errGrpContainers, errGrpCtx := errgroup.WithContext(ctx) + // Lookup the containers for each service and connect them + // to the reaper if needed. for _, srv := range d.project.Services { - // we are going to connect each container to the reaper srv := srv errGrpContainers.Go(func() error { dc, err := d.lookupContainer(errGrpCtx, srv.Name) @@ -347,19 +377,14 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { return err } - if d.reaper != nil { - termSignal, err := d.reaper.Connect() + if reaper != nil { + termSignal, err := reaper.Connect() if err != nil { - return fmt.Errorf("failed to connect to reaper: %w", err) + return fmt.Errorf("reaper connect: %w", err) } - dc.SetTerminationSignal(termSignal) - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + dc.SetTerminationSignal(termSignal) + termSignals = append(termSignals, termSignal) } d.containersLock.Lock() @@ -400,7 +425,11 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { }) } - return errGrpWait.Wait() + if err := errGrpWait.Wait(); err != nil { + return err + } + + return nil } func (d *dockerCompose) WaitForService(s string, strategy wait.Strategy) ComposeStack { @@ -427,12 +456,20 @@ func (d *dockerCompose) WithOsEnv() ComposeStack { return d } -func (d *dockerCompose) lookupContainer(ctx context.Context, svcName string) (*testcontainers.DockerContainer, error) { +// cachedContainer returns the cached container for svcName or nil if it doesn't exist. +func (d *dockerCompose) cachedContainer(svcName string) *testcontainers.DockerContainer { d.containersLock.Lock() defer d.containersLock.Unlock() - if ctr, ok := d.containers[svcName]; ok { - return ctr, nil + return d.containers[svcName] +} + +// lookupContainer is used to retrieve the container instance from the cache or the Docker API. +// +// Safe for concurrent calls. +func (d *dockerCompose) lookupContainer(ctx context.Context, svcName string) (*testcontainers.DockerContainer, error) { + if c := d.cachedContainer(svcName); c != nil { + return c, nil } containers, err := d.dockerClient.ContainerList(ctx, container.ListOptions{ @@ -471,6 +508,9 @@ func (d *dockerCompose) lookupContainer(ctx context.Context, svcName string) (*t return ctr, nil } +// lookupNetworks is used to retrieve the networks that are part of the compose stack. +// +// Safe for concurrent calls. func (d *dockerCompose) lookupNetworks(ctx context.Context) error { d.containersLock.Lock() defer d.containersLock.Unlock() @@ -524,9 +564,7 @@ func (d *dockerCompose) compileProject(ctx context.Context) (*types.Project, err api.OneoffLabel: "False", // default, will be overridden by `run` command } - for k, label := range testcontainers.GenericLabels() { - s.CustomLabels[k] = label - } + testcontainers.AddGenericLabels(s.CustomLabels) for i, envFile := range compiledOptions.EnvFiles { // add a label for each env file, indexed by its position @@ -543,9 +581,7 @@ func (d *dockerCompose) compileProject(ctx context.Context) (*types.Project, err api.VersionLabel: api.ComposeVersion, } - for k, label := range testcontainers.GenericLabels() { - n.Labels[k] = label - } + testcontainers.AddGenericLabels(n.Labels) proj.Networks[key] = n } diff --git a/reaper.go b/reaper.go index c17b4f329e9..77b6cb79f4f 100644 --- a/reaper.go +++ b/reaper.go @@ -1,13 +1,16 @@ package testcontainers import ( - "bufio" + "bytes" "context" + "errors" "fmt" - "math/rand" + "io" "net" + "os" "strings" "sync" + "syscall" "time" "github.com/cenkalti/backoff/v4" @@ -34,9 +37,22 @@ const ( var ( // Deprecated: it has been replaced by an internal value ReaperDefaultImage = config.ReaperDefaultImage - reaperInstance *Reaper // We would like to create reaper only once - reaperMutex sync.Mutex - reaperOnce sync.Once + + // reaperPort is the port that the reaper listens on. + reaperPort = nat.Port("8080/tcp") + + // errReaperNotFound is returned when no reaper container is found. + errReaperNotFound = errors.New("reaper not found") + + // errReaperDisabled is returned if a reaper is requested but the + // config has it disabled. + errReaperDisabled = errors.New("reaper disabled") + + // spawner is the singleton instance of reaperSpawner. + spawner = &reaperSpawner{} + + // reaperAck is the expected response from the reaper container. + reaperAck = []byte("ACK\n") ) // ReaperProvider represents a provider for the reaper to run itself with @@ -47,10 +63,18 @@ type ReaperProvider interface { } // NewReaper creates a Reaper with a sessionID to identify containers and a provider to use -// Deprecated: it's not possible to create a reaper anymore. Compose module uses this method +// Deprecated: it's not possible to create a reaper any more. Compose module uses this method // to create a reaper for the compose stack. +// +// The caller must call Connect at least once on the returned Reaper and use the returned +// result otherwise the reaper will be kept open until the process exits. func NewReaper(ctx context.Context, sessionID string, provider ReaperProvider, reaperImageName string) (*Reaper, error) { - return reuseOrCreateReaper(ctx, sessionID, provider) + reaper, err := spawner.reaper(ctx, sessionID, provider) + if err != nil { + return nil, fmt.Errorf("reaper: %w", err) + } + + return reaper, nil } // reaperContainerNameFromSessionID returns the container name that uniquely @@ -61,32 +85,68 @@ func reaperContainerNameFromSessionID(sessionID string) string { return fmt.Sprintf("reaper_%s", sessionID) } -// lookUpReaperContainer returns a DockerContainer type with the reaper container in the case +// reaperSpawner is a singleton that manages the reaper container. +type reaperSpawner struct { + instance *Reaper + mtx sync.Mutex +} + +// backoff returns a backoff policy for the reaper spawner. +// It will take at most 20 seconds, doing each attempt every 100ms - 250ms. +func (r *reaperSpawner) backoff() *backoff.ExponentialBackOff { + // We want random intervals between 100ms and 250ms for concurrent executions + // to not be synchronized: it could be the case that multiple executions of this + // function happen at the same time (specifically when called from a different test + // process execution), and we want to avoid that they all try to find the reaper + // container at the same time. + b := &backoff.ExponentialBackOff{ + InitialInterval: time.Millisecond * 100, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + // Adjust MaxInterval to compensate for randomization factor which can be added to + // returned interval so we have a maximum of 250ms. + MaxInterval: time.Duration(float64(time.Millisecond*250) * backoff.DefaultRandomizationFactor), + MaxElapsedTime: time.Second * 20, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + return b +} + +// cleanupLocked terminates the reaper container if set. +func (r *reaperSpawner) cleanup() error { + r.mtx.Lock() + defer r.mtx.Unlock() + + return r.cleanupLocked() +} + +// cleanupLocked terminates the reaper container if set. +// It must be called with the lock held. +func (r *reaperSpawner) cleanupLocked() error { + if r.instance == nil { + return nil + } + + err := TerminateContainer(r.instance.container) + r.instance = nil + + return err +} + +// lookupContainer returns a DockerContainer type with the reaper container in the case // it's found in the running state, and including the labels for sessionID, reaper, and ryuk. // It will perform a retry with exponential backoff to allow for the container to be started and // avoid potential false negatives. -func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContainer, error) { +func (r *reaperSpawner) lookupContainer(ctx context.Context, sessionID string) (*DockerContainer, error) { dockerClient, err := NewDockerClientWithOpts(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("new client: %w", err) } defer dockerClient.Close() - // the backoff will take at most 5 seconds to find the reaper container - // doing each attempt every 100ms - exp := backoff.NewExponentialBackOff() - - // we want random intervals between 100ms and 500ms for concurrent executions - // to not be synchronized: it could be the case that multiple executions of this - // function happen at the same time (specifically when called from a different test - // process execution), and we want to avoid that they all try to find the reaper - // container at the same time. - exp.InitialInterval = time.Duration(rand.Intn(5)*100) * time.Millisecond - exp.RandomizationFactor = rand.Float64() * 0.5 - exp.Multiplier = rand.Float64() * 2.0 - exp.MaxInterval = 5.0 * time.Second // max interval between attempts - exp.MaxElapsedTime = 1 * time.Minute // max time to keep trying - opts := container.ListOptions{ All: true, Filters: filters.NewArgs( @@ -97,159 +157,212 @@ func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContai ), } - return backoff.RetryNotifyWithData( + return backoff.RetryWithData( func() (*DockerContainer, error) { resp, err := dockerClient.ContainerList(ctx, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("container list: %w", err) } if len(resp) == 0 { - // reaper container not found in the running state: do not look for it again - return nil, nil + // No reaper container not found. + return nil, backoff.Permanent(errReaperNotFound) } if len(resp) > 1 { - return nil, fmt.Errorf("not possible to have multiple reaper containers found for session ID %s", sessionID) + return nil, fmt.Errorf("multiple reaper containers found for session ID %s", sessionID) } - r, err := containerFromDockerResponse(ctx, resp[0]) + container := resp[0] + r, err := containerFromDockerResponse(ctx, container) if err != nil { - return nil, err + return nil, fmt.Errorf("from docker: %w", err) } - if r.healthStatus == types.Healthy || r.healthStatus == types.NoHealthcheck { + switch { + case r.healthStatus == types.Healthy, + r.healthStatus == types.NoHealthcheck: return r, nil - } - - // if a health status is present on the container, and the container is healthy, error - if r.healthStatus != "" { - return nil, fmt.Errorf("container %s is not healthy, wanted status=%s, got status=%s", resp[0].ID[:8], types.Healthy, r.healthStatus) + case r.healthStatus != "": + return nil, fmt.Errorf("container not healthy: %s", r.healthStatus) } return r, nil }, - backoff.WithContext(exp, ctx), - func(err error, duration time.Duration) { - Logger.Printf("Error looking up reaper container, will retry: %v", err) - }, + backoff.WithContext(r.backoff(), ctx), ) } -// reuseOrCreateReaper returns an existing Reaper instance if it exists and is running. Otherwise, a new Reaper instance +// isRunning returns an error if the container is not running. +func (r *reaperSpawner) isRunning(ctx context.Context, container Container) error { + state, err := container.State(ctx) + if err != nil { + return fmt.Errorf("container state: %w", err) + } + + if !state.Running { + // Use NotFound error to indicate the container is not running + // and should be recreated. + return errdefs.NotFound(fmt.Errorf("container state: %s", state.Status)) + } + + return nil +} + +// retryError returns a permanent error if the error is not considered retryable. +func (r *reaperSpawner) retryError(err error) error { + var timeout interface { + Timeout() bool + } + switch { + case isCleanupSafe(err), + createContainerFailDueToNameConflictRegex.MatchString(err.Error()), + errors.Is(err, syscall.ECONNREFUSED), + errors.Is(err, syscall.ECONNRESET), + errors.Is(err, syscall.ECONNABORTED), + errors.Is(err, syscall.ETIMEDOUT), + errors.Is(err, os.ErrDeadlineExceeded), + errors.As(err, &timeout) && timeout.Timeout(), + errors.Is(err, context.DeadlineExceeded), + errors.Is(err, context.Canceled): + // Retryable error. + return err + default: + return backoff.Permanent(err) + } +} + +// reaper returns an existing Reaper instance if it exists and is running. Otherwise, a new Reaper instance // will be created with a sessionID to identify containers in the same test session/program. -func reuseOrCreateReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { - reaperMutex.Lock() - defer reaperMutex.Unlock() - - // 1. if the reaper instance has been already created, return it - if reaperInstance != nil { - // Verify this instance is still running by checking state. - // Can't use Container.IsRunning because the bool is not updated when Reaper is terminated - state, err := reaperInstance.container.State(ctx) - if err != nil { - if !errdefs.IsNotFound(err) { - return nil, err +// If connect is true, the reaper will be connected to the reaper container. +// Returns an error if config.RyukDisabled is true. +// +// Safe for concurrent calls. +func (r *reaperSpawner) reaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { + if config.Read().RyukDisabled { + return nil, errReaperDisabled + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + return backoff.RetryWithData( + r.retryLocked(ctx, sessionID, provider), + backoff.WithContext(r.backoff(), ctx), + ) +} + +// retryLocked returns a function that can be used to create or reuse a reaper container. +// If connect is true, the reaper will be connected to the reaper container. +// It must be called with the lock held. +func (r *reaperSpawner) retryLocked(ctx context.Context, sessionID string, provider ReaperProvider) func() (*Reaper, error) { + return func() (reaper *Reaper, err error) { //nolint:nonamedreturns // Needed for deferred error check. + reaper, err = r.reuseOrCreate(ctx, sessionID, provider) + // Ensure that the reaper is terminated if an error occurred. + defer func() { + if err != nil { + if reaper != nil { + err = errors.Join(err, TerminateContainer(reaper.container)) + } + err = r.retryError(errors.Join(err, r.cleanupLocked())) } - } else if state.Running { - return reaperInstance, nil - } - // else: the reaper instance has been terminated, so we need to create a new one - reaperOnce = sync.Once{} - } - - // 2. because the reaper instance has not been created yet, look for it in the Docker daemon, which - // will happen if the reaper container has been created in the same test session but in a different - // test process execution (e.g. when running tests in parallel), not having initialized the reaper - // instance yet. - reaperContainer, err := lookUpReaperContainer(context.Background(), sessionID) - if err == nil && reaperContainer != nil { - // The reaper container exists as a Docker container: re-use it - Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID) - reaperInstance, err = reuseReaperContainer(ctx, sessionID, provider, reaperContainer) + }() if err != nil { return nil, err } - return reaperInstance, nil - } + if err = r.isRunning(ctx, reaper.container); err != nil { + return nil, err + } - // 3. the reaper container does not exist in the Docker daemon: create it, and do it using the - // synchronization primitive to avoid multiple executions of this function to create the reaper - var reaperErr error - reaperOnce.Do(func() { - r, err := newReaper(ctx, sessionID, provider) + // Check we can still connect. + termSignal, err := reaper.connect(ctx) if err != nil { - reaperErr = err - return + return nil, fmt.Errorf("connect: %w", err) } - reaperInstance, reaperErr = r, nil - }) - if reaperErr != nil { - reaperOnce = sync.Once{} - return nil, reaperErr - } + reaper.setOrSignal(termSignal) - return reaperInstance, nil + r.instance = reaper + + return reaper, nil + } } -// reuseReaperContainer constructs a Reaper from an already running reaper -// DockerContainer. -func reuseReaperContainer(ctx context.Context, sessionID string, provider ReaperProvider, reaperContainer *DockerContainer) (*Reaper, error) { - endpoint, err := reaperContainer.PortEndpoint(ctx, "8080", "") +// reuseOrCreate returns an existing Reaper instance if it exists, otherwise a new Reaper instance. +func (r *reaperSpawner) reuseOrCreate(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { + if r.instance != nil { + // We already have an associated reaper. + return r.instance, nil + } + + // Look for an existing reaper created in the same test session but in a + // different test process execution e.g. when running tests in parallel. + container, err := r.lookupContainer(context.Background(), sessionID) + if err != nil { + if !errors.Is(err, errReaperNotFound) { + return nil, fmt.Errorf("look up container: %w", err) + } + + // The reaper container was not found, continue to create a new one. + reaper, err := r.newReaper(ctx, sessionID, provider) + if err != nil { + return nil, fmt.Errorf("new reaper: %w", err) + } + + return reaper, nil + } + + // A reaper container exists re-use it. + reaper, err := r.fromContainer(ctx, sessionID, provider, container) if err != nil { - return nil, err + return nil, fmt.Errorf("from container %q: %w", container.ID[:8], err) } - Logger.Printf("⏳ Waiting for Reaper port to be ready") + return reaper, nil +} - var containerJson *types.ContainerJSON +// fromContainer constructs a Reaper from an already running reaper DockerContainer. +func (r *reaperSpawner) fromContainer(ctx context.Context, sessionID string, provider ReaperProvider, container *DockerContainer) (*Reaper, error) { + Logger.Printf("⏳ Waiting for Reaper port %s to be ready", reaperPort) - if containerJson, err = reaperContainer.Inspect(ctx); err != nil { - return nil, fmt.Errorf("failed to inspect reaper container %s: %w", reaperContainer.ID[:8], err) + err := wait.ForListeningPort(reaperPort). + WithPollInterval(100*time.Millisecond). + SkipInternalCheck(). + WaitUntilReady(ctx, container) + if err != nil { + return nil, fmt.Errorf("wait for port %s: %w", reaperPort, err) } - if containerJson != nil && containerJson.NetworkSettings != nil { - for port := range containerJson.NetworkSettings.Ports { - err := wait.ForListeningPort(port). - WithPollInterval(100*time.Millisecond). - WaitUntilReady(ctx, reaperContainer) - if err != nil { - return nil, fmt.Errorf("failed waiting for reaper container %s port %s/%s to be ready: %w", - reaperContainer.ID[:8], port.Proto(), port.Port(), err) - } - } + endpoint, err := container.PortEndpoint(ctx, reaperPort, "") + if err != nil { + return nil, fmt.Errorf("port endpoint: %w", err) } + Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", container.ID[:8]) + return &Reaper{ Provider: provider, SessionID: sessionID, Endpoint: endpoint, - container: reaperContainer, + container: container, }, nil } -// newReaper creates a Reaper with a sessionID to identify containers and a -// provider to use. Do not call this directly, use reuseOrCreateReaper instead. -func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { +// newReaper creates a connected Reaper with a sessionID to identify containers +// and a provider to use. +// +// Do not call this directly, use reuseOrCreateReaper instead. +func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (reaper *Reaper, err error) { //nolint:nonamedreturns // Needed for deferred error check. dockerHostMount := core.ExtractDockerSocket(ctx) - reaper := &Reaper{ - Provider: provider, - SessionID: sessionID, - } - - listeningPort := nat.Port("8080/tcp") - tcConfig := provider.Config().Config - req := ContainerRequest{ Image: config.ReaperDefaultImage, - ExposedPorts: []string{string(listeningPort)}, + ExposedPorts: []string{string(reaperPort)}, Labels: core.DefaultLabels(sessionID), Privileged: tcConfig.RyukPrivileged, - WaitingFor: wait.ForListeningPort(listeningPort), + WaitingFor: wait.ForListeningPort(reaperPort), Name: reaperContainerNameFromSessionID(sessionID), HostConfigModifier: func(hc *container.HostConfig) { hc.AutoRemove = true @@ -278,123 +391,158 @@ func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) ( } c, err := provider.RunContainer(ctx, req) - if err != nil { - // We need to check whether the error is caused by a container with the same name - // already existing due to race conditions. We manually match the error message - // as we do not have any error types to check against. - if createContainerFailDueToNameConflictRegex.MatchString(err.Error()) { - // Manually retrieve the already running reaper container. However, we need to - // use retries here as there are two possible race conditions that might lead to - // errors: In most cases, there is a small delay between container creation and - // actually being visible in list-requests. This means that creation might fail - // due to name conflicts, but when we list containers with this name, we do not - // get any results. In another case, the container might have simply died in the - // meantime and therefore cannot be found. - const timeout = 5 * time.Second - const cooldown = 100 * time.Millisecond - start := time.Now() - var reaperContainer *DockerContainer - for time.Since(start) < timeout { - reaperContainer, err = lookUpReaperContainer(ctx, sessionID) - if err == nil && reaperContainer != nil { - break - } - select { - case <-ctx.Done(): - case <-time.After(cooldown): - } - } - if err != nil { - return nil, fmt.Errorf("look up reaper container due to name conflict failed: %w", err) - } - // If the reaper container was not found, it is most likely to have died in - // between as we can exclude any client errors because of the previous error - // check. Because the reaper should only die if it performed clean-ups, we can - // fail here as the reaper timeout needs to be increased, anyway. - if reaperContainer == nil { - return nil, fmt.Errorf("look up reaper container returned nil although creation failed due to name conflict") - } - Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID) - reaper, err := reuseReaperContainer(ctx, sessionID, provider, reaperContainer) - if err != nil { - return nil, err - } - return reaper, nil + defer func() { + if err != nil { + err = errors.Join(err, TerminateContainer(c)) } - return nil, err + }() + if err != nil { + return nil, fmt.Errorf("run container: %w", err) } - reaper.container = c endpoint, err := c.PortEndpoint(ctx, "8080", "") if err != nil { - return nil, err + return nil, fmt.Errorf("endpoint: %w", err) } - reaper.Endpoint = endpoint - return reaper, nil + return &Reaper{ + Provider: provider, + SessionID: sessionID, + Endpoint: endpoint, + container: c, + }, nil } // Reaper is used to start a sidecar container that cleans up resources type Reaper struct { - Provider ReaperProvider - SessionID string - Endpoint string - container Container + Provider ReaperProvider + SessionID string + Endpoint string + container Container + mtx sync.Mutex // Protects termSignal. + termSignal chan bool } -// Connect runs a goroutine which can be terminated by sending true into the returned channel +// Connect connects to the reaper container and sends the labels to it +// so that it can clean up the containers with the same labels. +// +// It returns a channel that can be closed to terminate the connection. +// Returns an error if config.RyukDisabled is true. func (r *Reaper) Connect() (chan bool, error) { - conn, err := net.DialTimeout("tcp", r.Endpoint, 10*time.Second) - if err != nil { - return nil, fmt.Errorf("%w: Connecting to Ryuk on %s failed", err, r.Endpoint) + if config.Read().RyukDisabled { + return nil, errReaperDisabled } - terminationSignal := make(chan bool) - go func(conn net.Conn) { - sock := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - defer conn.Close() + if termSignal := r.useTermSignal(); termSignal != nil { + return termSignal, nil + } - labelFilters := []string{} - for l, v := range core.DefaultLabels(r.SessionID) { - labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() - retryLimit := 3 - for retryLimit > 0 { - retryLimit-- + return r.connect(ctx) +} - if _, err := sock.WriteString(strings.Join(labelFilters, "&")); err != nil { - continue - } +// close signals the connection to close if needed. +// Safe for concurrent calls. +func (r *Reaper) close() { + r.mtx.Lock() + defer r.mtx.Unlock() - if _, err := sock.WriteString("\n"); err != nil { - continue - } + if r.termSignal != nil { + r.termSignal <- true + r.termSignal = nil + } +} - if err := sock.Flush(); err != nil { - continue - } +// setOrSignal sets the reapers termSignal field if nil +// otherwise consumes by sending true to it. +// Safe for concurrent calls. +func (r *Reaper) setOrSignal(termSignal chan bool) { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.termSignal != nil { + // Already have an existing connection, close the new one. + termSignal <- true + return + } - resp, err := sock.ReadString('\n') - if err != nil { - continue - } + // First or new unused termSignal, assign for caller to reuse. + r.termSignal = termSignal +} - if resp == "ACK\n" { - break - } - } +// useTermSignal if termSignal is not nil returns it +// and sets it to nil, otherwise returns nil. +// +// Safe for concurrent calls. +func (r *Reaper) useTermSignal() chan bool { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.termSignal == nil { + return nil + } + + // Use existing connection. + term := r.termSignal + r.termSignal = nil + + return term +} +// connect connects to the reaper container and sends the labels to it +// so that it can clean up the containers with the same labels. +// +// It returns a channel that can be closed to terminate the connection. +// Returns an error if config.RyukDisabled is true. +func (r *Reaper) connect(ctx context.Context) (chan bool, error) { + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", r.Endpoint) + if err != nil { + return nil, fmt.Errorf("dial reaper %s: %w", r.Endpoint, err) + } + + terminationSignal := make(chan bool) + go func() { + defer conn.Close() + if err := r.handshake(conn); err != nil { + Logger.Printf("Reaper handshake failed: %s", err) + } <-terminationSignal - }(conn) + }() return terminationSignal, nil } +// handshake sends the labels to the reaper container and reads the ACK. +func (r *Reaper) handshake(conn net.Conn) error { + labels := core.DefaultLabels(r.SessionID) + labelFilters := make([]string, 0, len(labels)) + for l, v := range labels { + labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) + } + + filters := []byte(strings.Join(labelFilters, "&") + "\n") + buf := make([]byte, 4) + if _, err := conn.Write(filters); err != nil { + return fmt.Errorf("writing filters: %w", err) + } + + n, err := io.ReadFull(conn, buf) + if err != nil { + return fmt.Errorf("read ack: %w", err) + } + + if !bytes.Equal(reaperAck, buf[:n]) { + // We have received the ACK so all done. + return fmt.Errorf("unexpected reaper response: %s", buf[:n]) + } + + return nil +} + // Labels returns the container labels to use so that this Reaper cleans them up // Deprecated: internally replaced by core.DefaultLabels(sessionID) func (r *Reaper) Labels() map[string]string { - return map[string]string{ - core.LabelLang: "go", - core.LabelSessionID: r.SessionID, - } + return GenericLabels() } diff --git a/reaper_test.go b/reaper_test.go index c757fbb3ea9..0b53c1d525a 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" + "github.com/docker/docker/errdefs" "github.com/docker/go-connections/nat" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,43 +24,24 @@ import ( const testSessionID = "this-is-a-different-session-id" type mockReaperProvider struct { - req ContainerRequest - hostConfig *container.HostConfig - enpointSettings map[string]*network.EndpointSettings - config TestcontainersConfig - initialReaper *Reaper - initialReaperOnce sync.Once - t *testing.T + req ContainerRequest + hostConfig *container.HostConfig + enpointSettings map[string]*network.EndpointSettings + config TestcontainersConfig } -func newMockReaperProvider(t *testing.T) *mockReaperProvider { +func newMockReaperProvider() *mockReaperProvider { m := &mockReaperProvider{ config: TestcontainersConfig{ Config: config.Config{}, }, - t: t, - initialReaper: reaperInstance, - //nolint:govet - initialReaperOnce: reaperOnce, } - // explicitly reset the reaperInstance to nil to start from a fresh state - reaperInstance = nil - reaperOnce = sync.Once{} - return m } var errExpected = errors.New("expected") -func (m *mockReaperProvider) RestoreReaperState() { - m.t.Cleanup(func() { - reaperInstance = m.initialReaper - //nolint:govet - reaperOnce = m.initialReaperOnce - }) -} - func (m *mockReaperProvider) RunContainer(ctx context.Context, req ContainerRequest) (Container, error) { m.req = req @@ -135,13 +117,9 @@ func TestContainerStartsWithoutTheReaper(t *testing.T) { sessionID := core.SessionID() - reaperContainer, err := lookUpReaperContainer(ctx, sessionID) - if err != nil { - t.Fatal(err, "expected reaper container not found.") - } - if reaperContainer != nil { - t.Fatal("expected zero reaper running.") - } + reaperContainer, err := spawner.lookupContainer(ctx, sessionID) + require.EqualError(t, err, errReaperNotFound.Error()) + require.Nil(t, reaperContainer) } func TestContainerStartsWithTheReaper(t *testing.T) { @@ -170,7 +148,7 @@ func TestContainerStartsWithTheReaper(t *testing.T) { sessionID := core.SessionID() - reaperContainer, err := lookUpReaperContainer(ctx, sessionID) + reaperContainer, err := spawner.lookupContainer(ctx, sessionID) if err != nil { t.Fatal(err, "expected reaper container running.") } @@ -420,28 +398,30 @@ func Test_NewReaper(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - if test.env != nil { - config.Reset() // reset the config using the internal method to avoid the sync.Once - for k, v := range test.env { - t.Setenv(k, v) - } + config.Reset() // reset the config using the internal method to avoid the sync.Once + for k, v := range test.env { + t.Setenv(k, v) } if prefix := os.Getenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX"); prefix != "" { test.config.Config.HubImageNamePrefix = prefix } - provider := newMockReaperProvider(t) + provider := newMockReaperProvider() provider.config = test.config - t.Cleanup(provider.RestoreReaperState) if test.ctx == nil { test.ctx = context.TODO() } - _, err := reuseOrCreateReaper(test.ctx, testSessionID, provider) + // We need a new reaperSpawner for each test case to avoid reusing + // an existing reaper instance. + spawner := &reaperSpawner{} + reaper, err := spawner.reaper(test.ctx, testSessionID, provider) + cleanupReaper(t, reaper, spawner) // we should have errored out see mockReaperProvider.RunContainer - require.EqualError(t, err, "expected") + require.Error(t, err) + require.Contains(t, err.Error(), "expected") assert.Equal(t, test.req.Image, provider.req.Image, "expected image doesn't match the submitted request") assert.Equal(t, test.req.ExposedPorts, provider.req.ExposedPorts, "expected exposed ports don't match the submitted request") @@ -464,21 +444,27 @@ func Test_ReaperReusedIfHealthy(t *testing.T) { t.Skip("Ryuk is disabled, skipping test") } - testProvider := newMockReaperProvider(t) - t.Cleanup(testProvider.RestoreReaperState) - SkipIfProviderIsNotHealthy(t) ctx := context.Background() // As other integration tests run with the (shared) Reaper as well, re-use the instance to not interrupt other tests - wasReaperRunning := reaperInstance != nil + if spawner.instance != nil { + t.Cleanup(func() { + require.NoError(t, spawner.cleanup()) + }) + } - provider, _ := ProviderDocker.GetProvider() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) + + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - reaperReused, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaperReused, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "reusing the Reaper should not error") + // assert that the internal state of both reaper instances is the same assert.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") assert.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") @@ -486,15 +472,9 @@ func Test_ReaperReusedIfHealthy(t *testing.T) { assert.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") assert.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") - terminate, err := reaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) + termSignal, err := reaper.Connect() + cleanupTermSignal(t, termSignal) require.NoError(t, err, "connecting to Reaper should be successful") - - if !wasReaperRunning { - terminateContainerOnEnd(t, ctx, reaper.container) - } } func Test_RecreateReaperIfTerminated(t *testing.T) { @@ -504,33 +484,55 @@ func Test_RecreateReaperIfTerminated(t *testing.T) { t.Skip("Ryuk is disabled, skipping test") } - mockProvider := newMockReaperProvider(t) - t.Cleanup(mockProvider.RestoreReaperState) - SkipIfProviderIsNotHealthy(t) provider, _ := ProviderDocker.GetProvider() ctx := context.Background() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - terminate, err := reaper.Connect() - require.NoError(t, err, "connecting to Reaper should be successful") - terminate <- true + termSignal, err := reaper.Connect() + if termSignal != nil { + termSignal <- true + } + require.NoError(t, err) + + // Wait for up to ryuk's default reconnect timeout + 1s to allow for a graceful shutdown/cleanup of the container. + timeout := time.NewTimer(time.Second * 11) + t.Cleanup(func() { + timeout.Stop() + }) + for { + state, err := reaper.container.State(ctx) + if err != nil { + if errdefs.IsNotFound(err) { + break + } + require.NoError(t, err) + } + + if !state.Running { + break + } + + select { + case <-timeout.C: + t.Fatal("reaper container should have been terminated") + default: + } - // Wait for ryuk's default timeout (10s) + 1s to allow for a graceful shutdown/cleanup of the container. - time.Sleep(11 * time.Second) + time.Sleep(time.Millisecond * 100) + } - recreatedReaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + recreatedReaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, recreatedReaper, spawner) require.NoError(t, err, "creating the Reaper should not error") assert.NotEqual(t, reaper.container.GetContainerID(), recreatedReaper.container.GetContainerID(), "expected different container ID") - terminate, err = recreatedReaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) + recreatedTermSignal, err := recreatedReaper.Connect() + cleanupTermSignal(t, recreatedTermSignal) require.NoError(t, err, "connecting to Reaper should be successful") - terminateContainerOnEnd(t, ctx, recreatedReaper.container) } func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { @@ -540,34 +542,33 @@ func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { t.Skip("Ryuk is disabled, skipping test") } - mockProvider := &mockReaperProvider{ - initialReaper: reaperInstance, - //nolint:govet - initialReaperOnce: reaperOnce, - t: t, - } - t.Cleanup(mockProvider.RestoreReaperState) - // explicitly set the reaperInstance to nil to simulate another test program in the same session accessing the same reaper - reaperInstance = nil - reaperOnce = sync.Once{} + spawner.instance = nil SkipIfProviderIsNotHealthy(t) ctx := context.Background() // As other integration tests run with the (shared) Reaper as well, re-use the instance to not interrupt other tests - wasReaperRunning := reaperInstance != nil + if spawner.instance != nil { + t.Cleanup(func() { + require.NoError(t, spawner.cleanup()) + }) + } - provider, _ := ProviderDocker.GetProvider() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) + + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") // explicitly reset the reaperInstance to nil to simulate another test program in the same session accessing the same reaper - reaperInstance = nil - reaperOnce = sync.Once{} + spawner.instance = nil - reaperReused, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaperReused, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "reusing the Reaper should not error") + // assert that the internal state of both reaper instances is the same assert.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") assert.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") @@ -575,15 +576,9 @@ func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { assert.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") assert.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") - terminate, err := reaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) + termSignal, err := reaper.Connect() + cleanupTermSignal(t, termSignal) require.NoError(t, err, "connecting to Reaper should be successful") - - if !wasReaperRunning { - terminateContainerOnEnd(t, ctx, reaper.container) - } } // TestReaper_ReuseRunning tests whether reusing the reaper if using @@ -602,7 +597,7 @@ func TestReaper_ReuseRunning(t *testing.T) { const concurrency = 64 - timeout, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + timeout, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() sessionID := SessionID() @@ -613,21 +608,16 @@ func TestReaper_ReuseRunning(t *testing.T) { obtainedReaperContainerIDs := make([]string, concurrency) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { - i := i wg.Add(1) - go func() { + go func(i int) { defer wg.Done() - reaperContainer, err := lookUpReaperContainer(timeout, sessionID) - if err == nil && reaperContainer != nil { - // Found. - obtainedReaperContainerIDs[i] = reaperContainer.GetContainerID() - return - } - // Not found -> create. - createdReaper, err := newReaper(timeout, sessionID, dockerProvider) - require.NoError(t, err, "new reaper should not fail") - obtainedReaperContainerIDs[i] = createdReaper.container.GetContainerID() - }() + spawner := &reaperSpawner{} + reaper, err := spawner.reaper(timeout, sessionID, dockerProvider) + cleanupReaper(t, reaper, spawner) + require.NoError(t, err) + + obtainedReaperContainerIDs[i] = reaper.container.GetContainerID() + }(i) } wg.Wait() @@ -637,3 +627,35 @@ func TestReaper_ReuseRunning(t *testing.T) { assert.Equal(t, firstContainerID, containerID, "call %d should have returned same container id", i) } } + +func TestSpawnerBackoff(t *testing.T) { + b := spawner.backoff() + for i := 0; i < 100; i++ { + require.LessOrEqual(t, b.NextBackOff(), time.Millisecond*250, "backoff should not exceed max interval") + } +} + +// cleanupReaper schedules reaper for cleanup if it's not nil. +func cleanupReaper(t *testing.T, reaper *Reaper, spawner *reaperSpawner) { + t.Helper() + + if reaper == nil { + return + } + + t.Cleanup(func() { + reaper.close() + require.NoError(t, spawner.cleanup()) + }) +} + +// cleanupTermSignal ensures that termSignal +func cleanupTermSignal(t *testing.T, termSignal chan bool) { + t.Helper() + + t.Cleanup(func() { + if termSignal != nil { + termSignal <- true + } + }) +}