Skip to content

Commit

Permalink
engine: remove support for serial docker pull image
Browse files Browse the repository at this point in the history
  • Loading branch information
sharanyad committed Sep 12, 2018
1 parent eca5e95 commit 67ff85a
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 259 deletions.
39 changes: 4 additions & 35 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type DockerTaskEngine struct {
// The write mutex should be taken when adding and removing tasks from managedTasks.
tasksLock sync.RWMutex

enableConcurrentPull bool
credentialsManager credentials.Manager
_time ttime.Time
_timeOnce sync.Once
Expand Down Expand Up @@ -152,8 +151,7 @@ func NewDockerTaskEngine(cfg *config.Config,

stateChangeEvents: make(chan statechange.Event),

enableConcurrentPull: false,
credentialsManager: credentialsManager,
credentialsManager: credentialsManager,

containerChangeEventStream: containerChangeEventStream,
imageManager: imageManager,
Expand Down Expand Up @@ -207,8 +205,6 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
engine.stopEngine = cancel

engine.ctx = derivedCtx
// Determine whether the engine can perform concurrent "docker pull" based on docker version
engine.enableConcurrentPull = engine.isParallelPullCompatible()

// Open the event stream before we sync state so that e.g. if a container
// goes from running to stopped after we sync with it as "running" we still
Expand Down Expand Up @@ -678,12 +674,9 @@ func (engine *DockerTaskEngine) pullContainer(task *apitask.Task, container *api
task.SetPullStoppedAt(timestamp)
}()

if engine.enableConcurrentPull {
seelog.Infof("Task engine [%s]: pulling container %s concurrently", task.Arn, container.Name)
return engine.concurrentPull(task, container)
}
seelog.Infof("Task engine [%s]: pulling container %s serially", task.Arn, container.Name)
return engine.serialPull(task, container)
seelog.Infof("Task engine [%s]: pulling container %s concurrently", task.Arn, container.Name)
return engine.concurrentPull(task, container)

}

// No pull image is required, just update container reference and use cached image.
Expand Down Expand Up @@ -751,30 +744,6 @@ func (engine *DockerTaskEngine) concurrentPull(task *apitask.Task, container *ap
return engine.pullAndUpdateContainerReference(task, container)
}

func (engine *DockerTaskEngine) serialPull(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {
seelog.Debugf("Task engine [%s]: attempting to obtain ImagePullDeleteLock to pull image - %s",
task.Arn, container.Image)
ImagePullDeleteLock.Lock()
seelog.Debugf("Task engine [%s]: acquired ImagePullDeleteLock, start pulling image - %s",
task.Arn, container.Image)
defer seelog.Debugf("Task engine [%s]: released ImagePullDeleteLock after pulling image - %s",
task.Arn, container.Image)
defer ImagePullDeleteLock.Unlock()

pullStart := engine.time().Now()
defer func(startTime time.Time) {
seelog.Infof("Task engine [%s]: finished pulling image [%s] in %s",
task.Arn, container.Image, time.Since(startTime).String())
}(pullStart)
ok := task.SetPullStartedAt(pullStart)
if ok {
seelog.Infof("Task engine [%s]: recording timestamp for starting image pull: %s",
task.Arn, pullStart.String())
}

return engine.pullAndUpdateContainerReference(task, container)
}

func (engine *DockerTaskEngine) pullAndUpdateContainerReference(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {
// If a task is blocked here for some time, and before it starts pulling image,
// the task's desired status is set to stopped, then don't pull the image
Expand Down
9 changes: 0 additions & 9 deletions agent/engine/docker_task_engine_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ func TestResourceContainerProgression(t *testing.T) {
// containerEventsWG is used to force the test to wait until the container created and started
// events are processed
containerEventsWG := sync.WaitGroup{}
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
gomock.InOrder(
// Ensure that the resource is created first
Expand Down Expand Up @@ -190,9 +187,6 @@ func TestResourceContainerProgressionFailure(t *testing.T) {
sleepTask.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
sleepTask.AddResource("cgroup", cgroupResource)
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
gomock.InOrder(
// resource creation failure
Expand Down Expand Up @@ -254,9 +248,6 @@ func TestTaskCPULimitHappyPath(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.12.6", nil)
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
containerName := make(chan string)
go func() {
Expand Down
58 changes: 0 additions & 58 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ func TestBatchContainerHappyPath(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.12.6", nil)
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
containerName := make(chan string)
go func() {
Expand Down Expand Up @@ -307,9 +304,6 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
// We cannot rely on the order of pulls between images as they can still be downloaded in
// parallel. The dependency graph enforcement comes into effect for CREATED transitions.
Expand Down Expand Up @@ -425,9 +419,6 @@ func TestRemoveEvents(t *testing.T) {
// containerEventsWG is used to force the test to wait until the container created and started
// events are processed
containerEventsWG := sync.WaitGroup{}
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().StopContainer(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
containerName := make(chan string)
Expand Down Expand Up @@ -502,9 +493,6 @@ func TestStartTimeoutThenStart(t *testing.T) {
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
testTime.EXPECT().Now().Return(time.Now()).AnyTimes()
testTime.EXPECT().After(gomock.Any())
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().APIVersion().Return(defaultDockerClientAPIVersion, nil)
for _, container := range sleepTask.Containers {
Expand Down Expand Up @@ -560,9 +548,6 @@ func TestSteadyStatePoll(t *testing.T) {
sleepTask.Arn = uuid.New()
eventStream := make(chan dockerapi.DockerContainerChangeEvent)

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
containerName := make(chan string)
go func() {
Expand Down Expand Up @@ -641,9 +626,6 @@ func TestStopWithPendingStops(t *testing.T) {
sleepTask2.Arn = "arn2"
eventStream := make(chan dockerapi.DockerContainerChangeEvent)

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.7.0", nil)
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
err := taskEngine.Init(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -756,9 +738,6 @@ func TestTaskTransitionWhenStopContainerTimesout(t *testing.T) {

sleepTask := testdata.LoadTask("sleep5")
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
mockTime.EXPECT().After(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -854,9 +833,6 @@ func TestTaskTransitionWhenStopContainerReturnsUnretriableError(t *testing.T) {

sleepTask := testdata.LoadTask("sleep5")
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
mockTime.EXPECT().After(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -927,9 +903,6 @@ func TestTaskTransitionWhenStopContainerReturnsTransientErrorBeforeSucceeding(t

sleepTask := testdata.LoadTask("sleep5")
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
mockTime.EXPECT().After(gomock.Any()).AnyTimes()
Expand Down Expand Up @@ -982,9 +955,6 @@ func TestGetTaskByArn(t *testing.T) {
defer ctrl.Finish()

mockTime.EXPECT().Now().Return(time.Now()).AnyTimes()
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
imageManager.EXPECT().AddAllImageStates(gomock.Any()).AnyTimes()
Expand All @@ -1007,25 +977,6 @@ func TestGetTaskByArn(t *testing.T) {
assert.False(t, found, "Task with invalid arn found in the task engine")
}

func TestEngineEnableConcurrentPull(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, _, taskEngine, _, _, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.11.1", nil)
}
client.EXPECT().ContainerEvents(gomock.Any())

err := taskEngine.Init(ctx)
assert.NoError(t, err)

dockerTaskEngine, _ := taskEngine.(*DockerTaskEngine)
assert.True(t, dockerTaskEngine.enableConcurrentPull,
"Task engine should be able to perform concurrent pulling for docker version >= 1.11.1")
}

func TestPauseContainerHappyPath(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand All @@ -1052,9 +1003,6 @@ func TestPauseContainerHappyPath(t *testing.T) {
},
})

if dockerVersionCheckDuringInit {
dockerClient.EXPECT().Version(gomock.Any(), gomock.Any())
}
dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)

pauseContainerID := "pauseContainerID"
Expand Down Expand Up @@ -1277,9 +1225,6 @@ func TestTaskWithCircularDependency(t *testing.T) {
ctrl, client, _, taskEngine, _, _, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return("1.12.6", nil)
}
client.EXPECT().ContainerEvents(gomock.Any())

task := testdata.LoadTask("circular_dependency")
Expand Down Expand Up @@ -1539,9 +1484,6 @@ func TestMetadataFileUpdatedAgentRestart(t *testing.T) {

state.AddTask(task)
state.AddContainer(dockerContainer, task)
if dockerVersionCheckDuringInit {
client.EXPECT().Version(gomock.Any(), gomock.Any())
}
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().DescribeContainer(gomock.Any(), gomock.Any())
Expand Down
46 changes: 0 additions & 46 deletions agent/engine/docker_task_engine_unix.go

This file was deleted.

52 changes: 0 additions & 52 deletions agent/engine/docker_task_engine_unix_test.go

This file was deleted.

22 changes: 0 additions & 22 deletions agent/engine/docker_task_engine_windows.go

This file was deleted.

9 changes: 0 additions & 9 deletions agent/engine/docker_task_engine_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ import (
"github.com/golang/mock/gomock"
)

const (
// dockerVersionCheckDuringInit specifies if Docker client's Version()
// API needs to be mocked in engine tests
//
// isParallelPullCompatible is not invoked during engin intialization
// on windows. No need for mock Docker client's Version() call
dockerVersionCheckDuringInit = false
)

func TestDeleteTask(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
Loading

0 comments on commit 67ff85a

Please sign in to comment.