diff --git a/container.go b/container.go index 4a85dffb8f..0db6dfaa47 100644 --- a/container.go +++ b/container.go @@ -48,7 +48,7 @@ type Container interface { Terminate(context.Context) error // terminate the container Logs(context.Context) (io.ReadCloser, error) // Get logs of the container FollowOutput(LogConsumer) - StartLogProducer(context.Context) error + StartLogProducer(context.Context, ...LogProducerOption) error StopLogProducer() error Name(context.Context) (string, error) // get container name State(context.Context) (*types.ContainerState, error) // returns container's running state @@ -61,6 +61,7 @@ type Container interface { CopyDirToContainer(ctx context.Context, hostDirPath string, containerParentPath string, fileMode int64) error CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error CopyFileFromContainer(ctx context.Context, filePath string) (io.ReadCloser, error) + GetLogProducerErrorChannel() <-chan error } // ImageBuildInfo defines what is needed to build an image diff --git a/docker.go b/docker.go index 252d94ec7c..bfa1d1c5ad 100644 --- a/docker.go +++ b/docker.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -67,6 +68,9 @@ type DockerContainer struct { raw *types.ContainerJSON stopProducer chan bool producerDone chan bool + producerError chan error + producerMutex sync.Mutex + producerTimeout *time.Duration logger Logging lifecycleHooks []ContainerLifecycleHooks } @@ -612,19 +616,68 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt return nil } +type LogProducerOption func(*DockerContainer) + +// WithLogProducerTimeout is a functional option that sets the timeout for the log producer. +// If the timeout is lower than 5s or greater than 60s it will be set to 5s or 60s respectively. +func WithLogProducerTimeout(timeout time.Duration) LogProducerOption { + return func(c *DockerContainer) { + c.producerTimeout = &timeout + } +} + // StartLogProducer will start a concurrent process that will continuously read logs -// from the container and will send them to each added LogConsumer -func (c *DockerContainer) StartLogProducer(ctx context.Context) error { - if c.stopProducer != nil { - return errors.New("log producer already started") +// from the container and will send them to each added LogConsumer. +// Default log producer timeout is 5s. It is used to set the context timeout +// which means that each log-reading loop will last at least the specified timeout +// and that it cannot be cancelled earlier. +// Use functional option WithLogProducerTimeout() to override default timeout. If it's +// lower than 5s and greater than 60s it will be set to 5s or 60s respectively. +func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProducerOption) error { + { + c.producerMutex.Lock() + defer c.producerMutex.Unlock() + + if c.stopProducer != nil { + return errors.New("log producer already started") + } + } + + for _, opt := range opts { + opt(c) + } + + minProducerTimeout := time.Duration(5 * time.Second) + maxProducerTimeout := time.Duration(60 * time.Second) + + if c.producerTimeout == nil { + c.producerTimeout = &minProducerTimeout + } + + if *c.producerTimeout < minProducerTimeout { + c.producerTimeout = &minProducerTimeout + } + + if *c.producerTimeout > maxProducerTimeout { + c.producerTimeout = &maxProducerTimeout } c.stopProducer = make(chan bool) c.producerDone = make(chan bool) + c.producerError = make(chan error, 1) - go func(stop <-chan bool, done chan<- bool) { + go func(stop <-chan bool, done chan<- bool, errorCh chan error) { // signal the producer is done once go routine exits, this prevents race conditions around start/stop - defer close(done) + // set c.stopProducer to nil so that it can be started again + defer func() { + defer c.producerMutex.Unlock() + close(done) + close(errorCh) + { + c.producerMutex.Lock() + c.stopProducer = nil + } + }() since := "" // if the socket is closed we will make additional logs request with updated Since timestamp @@ -636,25 +689,20 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { Since: since, } - ctx, cancel := context.WithTimeout(ctx, time.Second*5) + ctx, cancel := context.WithTimeout(ctx, *c.producerTimeout) defer cancel() r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { - // if we can't get the logs, panic, we can't return an error to anything - // from within this goroutine - panic(err) + errorCh <- err + return } defer c.provider.Close() for { select { case <-stop: - err := r.Close() - if err != nil { - // we can't close the read closer, this should never happen - panic(err) - } + errorCh <- r.Close() return default: h := make([]byte, 8) @@ -710,7 +758,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } } } - }(c.stopProducer, c.producerDone) + }(c.stopProducer, c.producerDone, c.producerError) return nil } @@ -718,16 +766,25 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // StopLogProducer will stop the concurrent process that is reading logs // and sending them to each added LogConsumer func (c *DockerContainer) StopLogProducer() error { + c.producerMutex.Lock() + defer c.producerMutex.Unlock() if c.stopProducer != nil { c.stopProducer <- true // block until the producer is actually done in order to avoid strange races <-c.producerDone c.stopProducer = nil c.producerDone = nil + return <-c.producerError } return nil } +// GetLogProducerErrorChannel exposes the only way for the consumer +// to be able to listen to errors and react to them. +func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error { + return c.producerError +} + // DockerNetwork represents a network started using Docker type DockerNetwork struct { ID string // Network ID from Docker diff --git a/docs/features/follow_logs.md b/docs/features/follow_logs.md index 4fa93cd01c..4269e27162 100644 --- a/docs/features/follow_logs.md +++ b/docs/features/follow_logs.md @@ -39,4 +39,59 @@ if err != nil { ``` `LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle -using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time +using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time. + +`StartLogProducer()` also accepts a functional parameter now used to set log producer timeout: +```golang +type LogProducerOption func(*DockerContainer) + +func WithLogProducerTimeout(timeout time.Duration) LogProducerOption { + return func(c *DockerContainer) { + c.producerTimeout = &timeout + } +} + +// usage +err := c.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second)) +if err != nil { + // do something with err +} +``` + +If no parameter is passed a default timeout of 5 seconds will be used. Values below 5 seconds and above 60 seconds will +be coerced to these boundary values. + +## Listening to errors + +When log producer fails to start within given timeout (causing a context deadline) or there's an error returned while closing the reader it will no longer panic, but instead will return an error over a channel. You can listen to it using `DockerContainer.GetLogProducerErrorChannel()` method: +```golang +func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error { + return c.producerError +} +``` + +This allows you to, for example, retry restarting log producer if it fails to start the first time. For example: + +```golang +// start log producer normally +err = container.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second)) + +// listen to errors in a detached goroutine +go func(done chan struct{}, timeout time.Duration, retryLimit int) { + for { + select { + case logErr := <-container.GetLogProducerErrorChannel(): + if logErr != nil { + // do something with error + // for example, retry starting log producer + // (here we retry it once, in real life you might want to retry it more times) + startErr := container.StartLogProducer(ctx, timeout) + if startErr != nil { + return + } + case <-done: + return + } + } +}(cons.logListeningDone, time.Duration(10*time.Second)) +``` \ No newline at end of file diff --git a/logconsumer_test.go b/logconsumer_test.go index c77f2602f8..3c0bef4a6c 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -458,3 +458,67 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) { } assert.Equal(t, "0", strings.TrimSpace(string(b))) } + +func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) { + ctx := context.Background() + req := ContainerRequest{ + FromDockerfile: FromDockerfile{ + Context: "./testdata/", + Dockerfile: "echoserver.Dockerfile", + }, + ExposedPorts: []string{"8080/tcp"}, + WaitingFor: wait.ForLog("ready"), + } + + gReq := GenericContainerRequest{ + ContainerRequest: req, + Started: true, + } + + c, err := GenericContainer(ctx, gReq) + require.NoError(t, err) + terminateContainerOnEnd(t, ctx, c) + + g := TestLogConsumer{ + Msgs: []string{}, + Done: make(chan bool), + Accepted: devNullAcceptorChan(), + } + + c.FollowOutput(&g) + + err = c.StartLogProducer(ctx, WithLogProducerTimeout(4*time.Second)) + require.NoError(t, err, "should still start with too low timeout") +} + +func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) { + ctx := context.Background() + req := ContainerRequest{ + FromDockerfile: FromDockerfile{ + Context: "./testdata/", + Dockerfile: "echoserver.Dockerfile", + }, + ExposedPorts: []string{"8080/tcp"}, + WaitingFor: wait.ForLog("ready"), + } + + gReq := GenericContainerRequest{ + ContainerRequest: req, + Started: true, + } + + c, err := GenericContainer(ctx, gReq) + require.NoError(t, err) + terminateContainerOnEnd(t, ctx, c) + + g := TestLogConsumer{ + Msgs: []string{}, + Done: make(chan bool), + Accepted: devNullAcceptorChan(), + } + + c.FollowOutput(&g) + + err = c.StartLogProducer(ctx, WithLogProducerTimeout(61*time.Second)) + require.NoError(t, err, "should still start with too high timeout") +}