Skip to content

Commit

Permalink
[breaking] Add err chan to log producer and don't panic on error (#1971)
Browse files Browse the repository at this point in the history
* added error  channel to container, so that log producers errors will be sent there instead of causing a panic

* use mutex to avoid race conditions around stopping log producer

* use bounded start timeout

* remove test that no longer makes sense

* CR changes: functional option for timeout, default timeout to min/max instead of erroring on incorrect values

* Update logconsumer_test.go

fix typo

Co-authored-by: Manuel de la Peña <social.mdelapenya@gmail.com>

* add missing container termination on test end

* update log following logs

* fix docs formatting

* chore: use H2 instead

---------

Co-authored-by: Manuel de la Peña <social.mdelapenya@gmail.com>
Co-authored-by: Manuel de la Peña <mdelapenya@gmail.com>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent 34325b3 commit 47a5bbb
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 18 deletions.
3 changes: 2 additions & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Container interface {
Terminate(context.Context) error // terminate the container
Logs(context.Context) (io.ReadCloser, error) // Get logs of the container
FollowOutput(LogConsumer)
StartLogProducer(context.Context) error
StartLogProducer(context.Context, ...LogProducerOption) error
StopLogProducer() error
Name(context.Context) (string, error) // get container name
State(context.Context) (*types.ContainerState, error) // returns container's running state
Expand All @@ -61,6 +61,7 @@ type Container interface {
CopyDirToContainer(ctx context.Context, hostDirPath string, containerParentPath string, fileMode int64) error
CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error
CopyFileFromContainer(ctx context.Context, filePath string) (io.ReadCloser, error)
GetLogProducerErrorChannel() <-chan error
}

// ImageBuildInfo defines what is needed to build an image
Expand Down
89 changes: 73 additions & 16 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -67,6 +68,9 @@ type DockerContainer struct {
raw *types.ContainerJSON
stopProducer chan bool
producerDone chan bool
producerError chan error
producerMutex sync.Mutex
producerTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
}
Expand Down Expand Up @@ -612,19 +616,68 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt
return nil
}

type LogProducerOption func(*DockerContainer)

// WithLogProducerTimeout is a functional option that sets the timeout for the log producer.
// If the timeout is lower than 5s or greater than 60s it will be set to 5s or 60s respectively.
func WithLogProducerTimeout(timeout time.Duration) LogProducerOption {
return func(c *DockerContainer) {
c.producerTimeout = &timeout
}
}

// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
if c.stopProducer != nil {
return errors.New("log producer already started")
// from the container and will send them to each added LogConsumer.
// Default log producer timeout is 5s. It is used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier.
// Use functional option WithLogProducerTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProducerOption) error {
{
c.producerMutex.Lock()
defer c.producerMutex.Unlock()

if c.stopProducer != nil {
return errors.New("log producer already started")
}
}

for _, opt := range opts {
opt(c)
}

minProducerTimeout := time.Duration(5 * time.Second)
maxProducerTimeout := time.Duration(60 * time.Second)

if c.producerTimeout == nil {
c.producerTimeout = &minProducerTimeout
}

if *c.producerTimeout < minProducerTimeout {
c.producerTimeout = &minProducerTimeout
}

if *c.producerTimeout > maxProducerTimeout {
c.producerTimeout = &maxProducerTimeout
}

c.stopProducer = make(chan bool)
c.producerDone = make(chan bool)
c.producerError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool) {
go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the producer is done once go routine exits, this prevents race conditions around start/stop
defer close(done)
// set c.stopProducer to nil so that it can be started again
defer func() {
defer c.producerMutex.Unlock()
close(done)
close(errorCh)
{
c.producerMutex.Lock()
c.stopProducer = nil
}
}()

since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
Expand All @@ -636,25 +689,20 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
Since: since,
}

ctx, cancel := context.WithTimeout(ctx, time.Second*5)
ctx, cancel := context.WithTimeout(ctx, *c.producerTimeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
// if we can't get the logs, panic, we can't return an error to anything
// from within this goroutine
panic(err)
errorCh <- err
return
}
defer c.provider.Close()

for {
select {
case <-stop:
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
panic(err)
}
errorCh <- r.Close()
return
default:
h := make([]byte, 8)
Expand Down Expand Up @@ -710,24 +758,33 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
}
}
}
}(c.stopProducer, c.producerDone)
}(c.stopProducer, c.producerDone, c.producerError)

return nil
}

// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.producerMutex.Lock()
defer c.producerMutex.Unlock()
if c.stopProducer != nil {
c.stopProducer <- true
// block until the producer is actually done in order to avoid strange races
<-c.producerDone
c.stopProducer = nil
c.producerDone = nil
return <-c.producerError
}
return nil
}

// GetLogProducerErrorChannel exposes the only way for the consumer
// to be able to listen to errors and react to them.
func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error {
return c.producerError
}

// DockerNetwork represents a network started using Docker
type DockerNetwork struct {
ID string // Network ID from Docker
Expand Down
57 changes: 56 additions & 1 deletion docs/features/follow_logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,59 @@ if err != nil {
```

`LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle
using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time
using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time.

`StartLogProducer()` also accepts a functional parameter now used to set log producer timeout:
```golang
type LogProducerOption func(*DockerContainer)

func WithLogProducerTimeout(timeout time.Duration) LogProducerOption {
return func(c *DockerContainer) {
c.producerTimeout = &timeout
}
}

// usage
err := c.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second))
if err != nil {
// do something with err
}
```

If no parameter is passed a default timeout of 5 seconds will be used. Values below 5 seconds and above 60 seconds will
be coerced to these boundary values.

## Listening to errors

When log producer fails to start within given timeout (causing a context deadline) or there's an error returned while closing the reader it will no longer panic, but instead will return an error over a channel. You can listen to it using `DockerContainer.GetLogProducerErrorChannel()` method:
```golang
func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error {
return c.producerError
}
```

This allows you to, for example, retry restarting log producer if it fails to start the first time. For example:

```golang
// start log producer normally
err = container.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second))

// listen to errors in a detached goroutine
go func(done chan struct{}, timeout time.Duration, retryLimit int) {
for {
select {
case logErr := <-container.GetLogProducerErrorChannel():
if logErr != nil {
// do something with error
// for example, retry starting log producer
// (here we retry it once, in real life you might want to retry it more times)
startErr := container.StartLogProducer(ctx, timeout)
if startErr != nil {
return
}
case <-done:
return
}
}
}(cons.logListeningDone, time.Duration(10*time.Second))
```
64 changes: 64 additions & 0 deletions logconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,67 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) {
}
assert.Equal(t, "0", strings.TrimSpace(string(b)))
}

func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testdata/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)
terminateContainerOnEnd(t, ctx, c)

g := TestLogConsumer{
Msgs: []string{},
Done: make(chan bool),
Accepted: devNullAcceptorChan(),
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx, WithLogProducerTimeout(4*time.Second))
require.NoError(t, err, "should still start with too low timeout")
}

func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testdata/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)
terminateContainerOnEnd(t, ctx, c)

g := TestLogConsumer{
Msgs: []string{},
Done: make(chan bool),
Accepted: devNullAcceptorChan(),
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx, WithLogProducerTimeout(61*time.Second))
require.NoError(t, err, "should still start with too high timeout")
}

0 comments on commit 47a5bbb

Please sign in to comment.