Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove support for serial docker pull image #1569

Merged
merged 1 commit into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 4 additions & 35 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type DockerTaskEngine struct {
// The write mutex should be taken when adding and removing tasks from managedTasks.
tasksLock sync.RWMutex

enableConcurrentPull bool
credentialsManager credentials.Manager
_time ttime.Time
_timeOnce sync.Once
Expand Down Expand Up @@ -152,8 +151,7 @@ func NewDockerTaskEngine(cfg *config.Config,

stateChangeEvents: make(chan statechange.Event),

enableConcurrentPull: false,
credentialsManager: credentialsManager,
credentialsManager: credentialsManager,

containerChangeEventStream: containerChangeEventStream,
imageManager: imageManager,
Expand Down Expand Up @@ -207,8 +205,6 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
engine.stopEngine = cancel

engine.ctx = derivedCtx
// Determine whether the engine can perform concurrent "docker pull" based on docker version
engine.enableConcurrentPull = engine.isParallelPullCompatible()

// Open the event stream before we sync state so that e.g. if a container
// goes from running to stopped after we sync with it as "running" we still
Expand Down Expand Up @@ -678,12 +674,9 @@ func (engine *DockerTaskEngine) pullContainer(task *apitask.Task, container *api
task.SetPullStoppedAt(timestamp)
}()

if engine.enableConcurrentPull {
seelog.Infof("Task engine [%s]: pulling container %s concurrently", task.Arn, container.Name)
return engine.concurrentPull(task, container)
}
seelog.Infof("Task engine [%s]: pulling container %s serially", task.Arn, container.Name)
return engine.serialPull(task, container)
seelog.Infof("Task engine [%s]: pulling container %s concurrently", task.Arn, container.Name)
return engine.concurrentPull(task, container)

}

// No pull image is required, just update container reference and use cached image.
Expand Down Expand Up @@ -751,30 +744,6 @@ func (engine *DockerTaskEngine) concurrentPull(task *apitask.Task, container *ap
return engine.pullAndUpdateContainerReference(task, container)
}

func (engine *DockerTaskEngine) serialPull(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {
seelog.Debugf("Task engine [%s]: attempting to obtain ImagePullDeleteLock to pull image - %s",
task.Arn, container.Image)
ImagePullDeleteLock.Lock()
seelog.Debugf("Task engine [%s]: acquired ImagePullDeleteLock, start pulling image - %s",
task.Arn, container.Image)
defer seelog.Debugf("Task engine [%s]: released ImagePullDeleteLock after pulling image - %s",
task.Arn, container.Image)
defer ImagePullDeleteLock.Unlock()

pullStart := engine.time().Now()
defer func(startTime time.Time) {
seelog.Infof("Task engine [%s]: finished pulling image [%s] in %s",
task.Arn, container.Image, time.Since(startTime).String())
}(pullStart)
ok := task.SetPullStartedAt(pullStart)
if ok {
seelog.Infof("Task engine [%s]: recording timestamp for starting image pull: %s",
task.Arn, pullStart.String())
}

return engine.pullAndUpdateContainerReference(task, container)
}

func (engine *DockerTaskEngine) pullAndUpdateContainerReference(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {
// If a task is blocked here for some time, and before it starts pulling image,
// the task's desired status is set to stopped, then don't pull the image
Expand Down
9 changes: 0 additions & 9 deletions agent/engine/docker_task_engine_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ func TestResourceContainerProgression(t *testing.T) {
// containerEventsWG is used to force the test to wait until the container created and started
// events are processed
containerEventsWG := sync.WaitGroup{}
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
gomock.InOrder(
// Ensure that the resource is created first
Expand Down Expand Up @@ -190,9 +187,6 @@ func TestResourceContainerProgressionFailure(t *testing.T) {
sleepTask.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
sleepTask.AddResource("cgroup", cgroupResource)
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
gomock.InOrder(
// resource creation failure
Expand Down Expand Up @@ -254,9 +248,6 @@ func TestTaskCPULimitHappyPath(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.12.6", nil)
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
containerName := make(chan string)
go func() {
Expand Down
58 changes: 0 additions & 58 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ func TestBatchContainerHappyPath(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.12.6", nil)
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
containerName := make(chan string)
go func() {
Expand Down Expand Up @@ -307,9 +304,6 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
// We cannot rely on the order of pulls between images as they can still be downloaded in
// parallel. The dependency graph enforcement comes into effect for CREATED transitions.
Expand Down Expand Up @@ -425,9 +419,6 @@ func TestRemoveEvents(t *testing.T) {
// containerEventsWG is used to force the test to wait until the container created and started
// events are processed
containerEventsWG := sync.WaitGroup{}
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().StopContainer(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
containerName := make(chan string)
Expand Down Expand Up @@ -502,9 +493,6 @@ func TestStartTimeoutThenStart(t *testing.T) {
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
testTime.EXPECT().Now().Return(time.Now()).AnyTimes()
testTime.EXPECT().After(gomock.Any())
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().APIVersion().Return(defaultDockerClientAPIVersion, nil)
for _, container := range sleepTask.Containers {
Expand Down Expand Up @@ -560,9 +548,6 @@ func TestSteadyStatePoll(t *testing.T) {
sleepTask.Arn = uuid.New()
eventStream := make(chan dockerapi.DockerContainerChangeEvent)

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
containerName := make(chan string)
go func() {
Expand Down Expand Up @@ -641,9 +626,6 @@ func TestStopWithPendingStops(t *testing.T) {
sleepTask2.Arn = "arn2"
eventStream := make(chan dockerapi.DockerContainerChangeEvent)

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.7.0", nil)
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
err := taskEngine.Init(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -756,9 +738,6 @@ func TestTaskTransitionWhenStopContainerTimesout(t *testing.T) {

sleepTask := testdata.LoadTask("sleep5")
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
mockTime.EXPECT().After(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -854,9 +833,6 @@ func TestTaskTransitionWhenStopContainerReturnsUnretriableError(t *testing.T) {

sleepTask := testdata.LoadTask("sleep5")
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
mockTime.EXPECT().After(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -927,9 +903,6 @@ func TestTaskTransitionWhenStopContainerReturnsTransientErrorBeforeSucceeding(t

sleepTask := testdata.LoadTask("sleep5")
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
mockTime.EXPECT().After(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -982,9 +955,6 @@ func TestGetTaskByArn(t *testing.T) {
defer ctrl.Finish()

mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
imageManager.EXPECT().AddAllImageStates(gomock.Any()).AnyTimes()
Expand All @@ -1007,25 +977,6 @@ func TestGetTaskByArn(t *testing.T) {
assert.False(t, found, "Task with invalid arn found in the task engine")
}

func TestEngineEnableConcurrentPull(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, _, taskEngine, _, _, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.11.1", nil)
}
client.EXPECT().ContainerEvents(gomock.Any())

err := taskEngine.Init(ctx)
assert.NoError(t, err)

dockerTaskEngine, _ := taskEngine.(*DockerTaskEngine)
assert.True(t, dockerTaskEngine.enableConcurrentPull,
"Task engine should be able to perform concurrent pulling for docker version >= 1.11.1")
}

func TestPauseContainerHappyPath(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand All @@ -1052,9 +1003,6 @@ func TestPauseContainerHappyPath(t *testing.T) {
},
})

if dockerVersionCheckDuringInit {
dockerClient.EXPECT().Version(gomock.Any(), gomock.Any())
}
dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)

pauseContainerID := "pauseContainerID"
Expand Down Expand Up @@ -1277,9 +1225,6 @@ func TestTaskWithCircularDependency(t *testing.T) {
ctrl, client, _, taskEngine, _, _, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.12.6", nil)
}
client.EXPECT().ContainerEvents(gomock.Any())

task := testdata.LoadTask("circular_dependency")
Expand Down Expand Up @@ -1539,9 +1484,6 @@ func TestMetadataFileUpdatedAgentRestart(t *testing.T) {

state.AddTask(task)
state.AddContainer(dockerContainer, task)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().DescribeContainer(gomock.Any(), gomock.Any())
Expand Down
46 changes: 0 additions & 46 deletions agent/engine/docker_task_engine_unix.go

This file was deleted.

52 changes: 0 additions & 52 deletions agent/engine/docker_task_engine_unix_test.go

This file was deleted.

22 changes: 0 additions & 22 deletions agent/engine/docker_task_engine_windows.go

This file was deleted.

9 changes: 0 additions & 9 deletions agent/engine/docker_task_engine_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ import (
"github.com/golang/mock/gomock"
)

const (
// dockerVersionCheckDuringInit specifies if Docker client's Version()
// API needs to be mocked in engine tests
//
// isParallelPullCompatible is not invoked during engin intialization
// on windows. No need for mock Docker client's Version() call
dockerVersionCheckDuringInit = false
)

func TestDeleteTask(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
Loading