Skip to content

Commit

Permalink
Fix restart failures (#1317)
Browse files Browse the repository at this point in the history
* add logic to wait for postgres container exit for dev restart command

* fix linting errors

* added new tests
  • Loading branch information
neel-astro authored Jul 24, 2023
1 parent 3ef8acd commit 4568592
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 19 deletions.
2 changes: 1 addition & 1 deletion airflow/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

type ContainerHandler interface {
Start(imageName, settingsFile, composeFile string, noCache, noBrowser bool, waitTime time.Duration) error
Stop() error
Stop(waitForExit bool) error
PS() error
Kill() error
Logs(follow bool, containerNames ...string) error
Expand Down
40 changes: 37 additions & 3 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"runtime"
"strings"
Expand Down Expand Up @@ -33,12 +32,14 @@ import (
"github.com/docker/docker/api/types/versions"
"github.com/pkg/browser"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const (
componentName = "airflow"
podman = "podman"
dockerStateUp = "running"
dockerExitState = "exited"
defaultAirflowVersion = uint64(0x2) //nolint:gomnd
triggererAllowedRuntimeVersion = "4.0.0"
triggererAllowedAirflowVersion = "2.2.0"
Expand Down Expand Up @@ -79,6 +80,9 @@ var (
isM1 = util.IsM1

composeOverrideFilename = "docker-compose.override.yml"

stopPostgresWaitTimeout = 10 * time.Second
stopPostgresWaitTicker = 1 * time.Second
)

// ComposeConfig is input data to docker compose yaml template
Expand Down Expand Up @@ -291,7 +295,7 @@ func (d *DockerCompose) ComposeExport(settingsFile, composeFile string) error {
}

// Stop a running docker project
func (d *DockerCompose) Stop() error {
func (d *DockerCompose) Stop(waitForExit bool) error {
imageLabels, err := d.imageHandler.ListLabels()
if err != nil {
return err
Expand All @@ -309,7 +313,37 @@ func (d *DockerCompose) Stop() error {
return errors.Wrap(err, composePauseErrMsg)
}

return nil
if !waitForExit {
return nil
}

// Adding check on wether all containers have exited or not, because in case of restart command with immediate start after stop execution,
// in windows machine it take a fraction of second for container to be in exited state, after docker compose completes the stop command execution
// causing the dev start for airflow to fail
timeout := time.After(stopPostgresWaitTimeout)
ticker := time.NewTicker(stopPostgresWaitTicker)
for {
select {
case <-timeout:
log.Debug("timed out waiting for postgres container to be in exited state")
return nil
case <-ticker.C:
psInfo, _ := d.composeService.Ps(context.Background(), d.projectName, api.PsOptions{
All: true,
})
for i := range psInfo {
// we only need to check for postgres container state, since all other containers depends on postgres container
// so docker compose will ensure that postgres container going in shutting down phase only after all other containers have exited
if strings.Contains(psInfo[i].Name, PostgresDockerContainerName) {
if psInfo[i].State == dockerExitState {
log.Debug("postgres container reached exited state")
return nil
}
log.Debugf("postgres container is still in %s state, waiting for it to be in exited state", psInfo[i].State)
}
}
}
}
}

func (d *DockerCompose) PS() error {
Expand Down
83 changes: 80 additions & 3 deletions airflow/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/astronomer/astro-cli/airflow/mocks"
airflowTypes "github.com/astronomer/astro-cli/airflow/types"
"github.com/astronomer/astro-cli/config"
"github.com/sirupsen/logrus"

testUtils "github.com/astronomer/astro-cli/pkg/testing"
"github.com/compose-spec/compose-go/types"
Expand Down Expand Up @@ -713,20 +714,96 @@ func TestDockerComposeStop(t *testing.T) {
mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.Stop()
err := mockDockerCompose.Stop(false)
assert.NoError(t, err)

imageHandler.AssertExpectations(t)
composeMock.AssertExpectations(t)
})

t.Run("success with wait but on first try", func(t *testing.T) {
imageHandler := new(mocks.ImageHandler)
imageHandler.On("ListLabels").Return(map[string]string{airflowVersionLabelName: airflowVersionLabel}, nil).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Stop", mock.Anything, mock.Anything, api.StopOptions{}).Return(nil).Once()
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{{ID: "test-postgres", Name: "test-postgres", State: "exited"}}, nil).Once()

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

logrus.SetLevel(5) // debug level
var out bytes.Buffer
logrus.SetOutput(&out)

err := mockDockerCompose.Stop(true)
assert.NoError(t, err)

assert.Contains(t, out.String(), "postgres container reached exited state")
imageHandler.AssertExpectations(t)
composeMock.AssertExpectations(t)
})

t.Run("success after waiting for once", func(t *testing.T) {
imageHandler := new(mocks.ImageHandler)
imageHandler.On("ListLabels").Return(map[string]string{airflowVersionLabelName: airflowVersionLabel}, nil).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Stop", mock.Anything, mock.Anything, api.StopOptions{}).Return(nil).Once()
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{{ID: "test-postgres", Name: "test-postgres", State: "running"}}, nil).Once()
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{{ID: "test-postgres", Name: "test-postgres", State: "exited"}}, nil).Once()

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

logrus.SetLevel(5) // debug level
var out bytes.Buffer
logrus.SetOutput(&out)

err := mockDockerCompose.Stop(true)
assert.NoError(t, err)

assert.Contains(t, out.String(), "postgres container is still in running state, waiting for it to be in exited state")
assert.Contains(t, out.String(), "postgres container reached exited state")
imageHandler.AssertExpectations(t)
composeMock.AssertExpectations(t)
})

t.Run("time out during the wait for postgres exit", func(t *testing.T) {
imageHandler := new(mocks.ImageHandler)
imageHandler.On("ListLabels").Return(map[string]string{airflowVersionLabelName: airflowVersionLabel}, nil).Once()

composeMock := new(mocks.DockerComposeAPI)
composeMock.On("Stop", mock.Anything, mock.Anything, api.StopOptions{}).Return(nil).Once()
composeMock.On("Ps", mock.Anything, mockDockerCompose.projectName, api.PsOptions{All: true}).Return([]api.ContainerSummary{{ID: "test-postgres", Name: "test-postgres", State: "running"}}, nil)

// reducing timeout
stopPostgresWaitTimeout = 11 * time.Millisecond
stopPostgresWaitTicker = 10 * time.Millisecond

mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

logrus.SetLevel(5) // debug level
var out bytes.Buffer
logrus.SetOutput(&out)

err := mockDockerCompose.Stop(true)
assert.NoError(t, err)

assert.Contains(t, out.String(), "postgres container is still in running state, waiting for it to be in exited state")
assert.Contains(t, out.String(), "timed out waiting for postgres container to be in exited state")
imageHandler.AssertExpectations(t)
composeMock.AssertExpectations(t)
})

t.Run("list label failure", func(t *testing.T) {
imageHandler := new(mocks.ImageHandler)
imageHandler.On("ListLabels").Return(map[string]string{}, errMockDocker).Once()

mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.Stop()
err := mockDockerCompose.Stop(false)
assert.ErrorIs(t, err, errMockDocker)

imageHandler.AssertExpectations(t)
Expand All @@ -742,7 +819,7 @@ func TestDockerComposeStop(t *testing.T) {
mockDockerCompose.composeService = composeMock
mockDockerCompose.imageHandler = imageHandler

err := mockDockerCompose.Stop()
err := mockDockerCompose.Stop(false)
assert.ErrorIs(t, err, errMockDocker)

imageHandler.AssertExpectations(t)
Expand Down
10 changes: 5 additions & 5 deletions airflow/mocks/ContainerHandler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cmd/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func airflowStop(cmd *cobra.Command, args []string) error {
return err
}

return containerHandler.Stop()
return containerHandler.Stop(false)
}

// Stop an airflow cluster
Expand All @@ -614,7 +614,7 @@ func airflowRestart(cmd *cobra.Command, args []string) error {
return err
}

err = containerHandler.Stop()
err = containerHandler.Stop(true)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/airflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func TestAirflowStop(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("Stop").Return(nil).Once()
mockContainerHandler.On("Stop", false).Return(nil).Once()
return mockContainerHandler, nil
}

Expand All @@ -662,7 +662,7 @@ func TestAirflowStop(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("Stop").Return(errMock).Once()
mockContainerHandler.On("Stop", false).Return(errMock).Once()
return mockContainerHandler, nil
}

Expand Down Expand Up @@ -692,7 +692,7 @@ func TestAirflowRestart(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("Stop").Return(nil).Once()
mockContainerHandler.On("Stop", true).Return(nil).Once()
mockContainerHandler.On("Start", "", "airflow_settings.yaml", "", true, true, 1*time.Minute).Return(nil).Once()
return mockContainerHandler, nil
}
Expand All @@ -709,7 +709,7 @@ func TestAirflowRestart(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("Stop").Return(errMock).Once()
mockContainerHandler.On("Stop", true).Return(errMock).Once()
return mockContainerHandler, nil
}

Expand All @@ -725,7 +725,7 @@ func TestAirflowRestart(t *testing.T) {

mockContainerHandler := new(mocks.ContainerHandler)
containerHandlerInit = func(airflowHome, envFile, dockerfile, imageName string) (airflow.ContainerHandler, error) {
mockContainerHandler.On("Stop").Return(nil).Once()
mockContainerHandler.On("Stop", true).Return(nil).Once()
mockContainerHandler.On("Start", "", "airflow_settings.yaml", "", true, true, 1*time.Minute).Return(errMock).Once()
return mockContainerHandler, nil
}
Expand Down

0 comments on commit 4568592

Please sign in to comment.