Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[breaking] Add err chan to log producer and don't panic on error #1971

Merged
merged 14 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Container interface {
Terminate(context.Context) error // terminate the container
Logs(context.Context) (io.ReadCloser, error) // Get logs of the container
FollowOutput(LogConsumer)
StartLogProducer(context.Context) error
StartLogProducer(context.Context, time.Duration) error
StopLogProducer() error
Name(context.Context) (string, error) // get container name
State(context.Context) (*types.ContainerState, error) // returns container's running state
Expand All @@ -61,6 +61,7 @@ type Container interface {
CopyDirToContainer(ctx context.Context, hostDirPath string, containerParentPath string, fileMode int64) error
CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error
CopyFileFromContainer(ctx context.Context, filePath string) (io.ReadCloser, error)
GetLogProducerErrorChannel() <-chan error
}

// ImageBuildInfo defines what is needed to build an image
Expand Down
57 changes: 42 additions & 15 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -70,6 +71,8 @@ type DockerContainer struct {
raw *types.ContainerJSON
stopProducer chan bool
producerDone chan bool
producerError chan error
producerMutex sync.Mutex
logger Logging
lifecycleHooks []ContainerLifecycleHooks
}
Expand Down Expand Up @@ -613,17 +616,39 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt

// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
if c.stopProducer != nil {
return errors.New("log producer already started")
// timeout accepts values between 5 and 60s and will be used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier
func (c *DockerContainer) StartLogProducer(ctx context.Context, timeout time.Duration) error {
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
{
c.producerMutex.Lock()
defer c.producerMutex.Unlock()

if c.stopProducer != nil {
return errors.New("log producer already started")
}
}

if timeout < time.Duration(5*time.Second) || timeout > time.Duration(60*time.Second) {
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("timeout must be between 5 and 60 seconds")
}

c.stopProducer = make(chan bool)
c.producerDone = make(chan bool)
c.producerError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool) {
go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the producer is done once go routine exits, this prevents race conditions around start/stop
defer close(done)
// set c.stopProducer to nil so that it can be started again
defer func() {
defer c.producerMutex.Unlock()
close(done)
close(errorCh)
{
c.producerMutex.Lock()
c.stopProducer = nil
}
}()

since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
Expand All @@ -635,25 +660,20 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
Since: since,
}

ctx, cancel := context.WithTimeout(ctx, time.Second*5)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
// if we can't get the logs, panic, we can't return an error to anything
// from within this goroutine
panic(err)
errorCh <- err
return
}
defer c.provider.Close()

for {
select {
case <-stop:
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
panic(err)
}
errorCh <- r.Close()
return
default:
h := make([]byte, 8)
Expand Down Expand Up @@ -709,24 +729,31 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
}
}
}
}(c.stopProducer, c.producerDone)
}(c.stopProducer, c.producerDone, c.producerError)

return nil
}

// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.producerMutex.Lock()
defer c.producerMutex.Unlock()
if c.stopProducer != nil {
c.stopProducer <- true
// block until the producer is actually done in order to avoid strange races
<-c.producerDone
c.stopProducer = nil
c.producerDone = nil
return <-c.producerError
}
return nil
}

func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error {
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
return c.producerError
}

// DockerNetwork represents a network started using Docker
type DockerNetwork struct {
ID string // Network ID from Docker
Expand Down
78 changes: 71 additions & 7 deletions logconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func Test_LogConsumerGetsCalled(t *testing.T) {

c.FollowOutput(&g)

err = c.StartLogProducer(ctx)
err = c.StartLogProducer(ctx, time.Duration(5*time.Second))
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=hello")
Expand Down Expand Up @@ -148,7 +148,7 @@ func Test_ShouldRecognizeLogTypes(t *testing.T) {

c.FollowOutput(&g)

err = c.StartLogProducer(ctx)
err = c.StartLogProducer(ctx, time.Duration(5*time.Second))
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=this-is-stdout")
Expand Down Expand Up @@ -205,7 +205,7 @@ func Test_MultipleLogConsumers(t *testing.T) {
c.FollowOutput(&first)
c.FollowOutput(&second)

err = c.StartLogProducer(ctx)
err = c.StartLogProducer(ctx, time.Duration(5*time.Second))
require.NoError(t, err)

_, err = http.Get(ep + "/stdout?echo=mlem")
Expand Down Expand Up @@ -254,18 +254,18 @@ func Test_StartStop(t *testing.T) {

require.NoError(t, c.StopLogProducer(), "nothing should happen even if the producer is not started")

require.NoError(t, c.StartLogProducer(ctx))
require.NoError(t, c.StartLogProducer(ctx, time.Duration(5*time.Second)))
require.Equal(t, <-g.Accepted, "ready\n")

require.Error(t, c.StartLogProducer(ctx), "log producer is already started")
require.Error(t, c.StartLogProducer(ctx, time.Duration(5*time.Second)), "log producer is already started")

_, err = http.Get(ep + "/stdout?echo=mlem")
require.NoError(t, err)
require.Equal(t, <-g.Accepted, "echo mlem\n")

require.NoError(t, c.StopLogProducer())

require.NoError(t, c.StartLogProducer(ctx))
require.NoError(t, c.StartLogProducer(ctx, time.Duration(5*time.Second)))
require.Equal(t, <-g.Accepted, "ready\n")
require.Equal(t, <-g.Accepted, "echo mlem\n")

Expand Down Expand Up @@ -375,7 +375,7 @@ func TestContainerLogWithErrClosed(t *testing.T) {
Accepted: devNullAcceptorChan(),
}

if err = nginx.StartLogProducer(ctx); err != nil {
if err = nginx.StartLogProducer(ctx, time.Duration(5*time.Second)); err != nil {
t.Fatal(err)
}
defer func() {
Expand Down Expand Up @@ -453,3 +453,67 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) {
}
assert.Equal(t, "0", strings.TrimSpace(string(b)))
}

func Test_StartLogProducerErrorsWithTooLowTimeout(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testdata/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)

g := TestLogConsumer{
Msgs: []string{},
Done: make(chan bool),
Accepted: devNullAcceptorChan(),
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx, time.Duration(4*time.Second))
require.Error(t, err)
require.Equal(t, "timeout must be between 5 and 60 seconds", err.Error())
}

func Test_StartLogProducerErrorsWithTooHighTimeout(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testdata/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)

g := TestLogConsumer{
Msgs: []string{},
Done: make(chan bool),
Accepted: devNullAcceptorChan(),
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx, time.Duration(61*time.Second))
require.Error(t, err)
require.Equal(t, "timeout must be between 5 and 60 seconds", err.Error())
}