Skip to content

Commit

Permalink
docker task manager: dont hang on closed channels
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed May 27, 2020
1 parent d7d87c6 commit ab60658
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
3 changes: 1 addition & 2 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,7 @@ func (engine *DockerTaskEngine) handleDockerEvent(event dockerapi.DockerContaine

engine.tasksLock.RLock()
managedTask, ok := engine.managedTasks[task.Arn]
// hold the lock until the message is sent so we don't send on a closed channel
defer engine.tasksLock.RUnlock()
engine.tasksLock.RUnlock()
if !ok {
seelog.Criticalf("Task engine: could not find managed task [%s] corresponding to a docker event: %s",
task.Arn, event.String())
Expand Down
46 changes: 25 additions & 21 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,11 @@ func (mtask *managedTask) handleResourceStateChange(resChange resourceStateChang
}

func (mtask *managedTask) emitResourceChange(change resourceStateChange) {
if mtask.ctx.Err() != nil {
seelog.Infof("Managed task [%s]: unable to emit resource state change due to closed context: %v",
mtask.Arn, mtask.ctx.Err())
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to emit resource state change due to exit", mtask.Arn)
case mtask.resourceStateChangeEvent <- change:
}
mtask.resourceStateChangeEvent <- change
}

func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
Expand All @@ -513,7 +513,11 @@ func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
return
}
seelog.Infof("Managed task [%s]: sending task change event [%s]", mtask.Arn, event.String())
mtask.stateChangeEvents <- event
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to send task change event [%s] due to exit", mtask.Arn, event.String())
case mtask.stateChangeEvents <- event:
}
seelog.Infof("Managed task [%s]: sent task change event [%s]", mtask.Arn, event.String())
}

Expand All @@ -527,27 +531,32 @@ func (mtask *managedTask) emitContainerEvent(task *apitask.Task, cont *apicontai
return
}

seelog.Infof("Managed task [%s]: sending container change event [%s]: %s",
seelog.Infof("Managed task [%s]: Container [%s]: sending container change event: %s",
mtask.Arn, cont.Name, event.String())
mtask.stateChangeEvents <- event
seelog.Infof("Managed task [%s]: sent container change event [%s]: %s",
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: Container [%s]: unable to send container change event [%s] due to exit",
mtask.Arn, cont.Name, event.String())
case mtask.stateChangeEvents <- event:
}
seelog.Infof("Managed task [%s]: Container [%s]: sent container change event: %s",
mtask.Arn, cont.Name, event.String())
}

func (mtask *managedTask) emitDockerContainerChange(change dockerContainerChange) {
if mtask.ctx.Err() != nil {
seelog.Infof("Managed task [%s]: unable to emit docker container change due to closed context: %v",
mtask.Arn, mtask.ctx.Err())
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to emit docker container change due to exit", mtask.Arn)
case mtask.dockerMessages <- change:
}
mtask.dockerMessages <- change
}

func (mtask *managedTask) emitACSTransition(transition acsTransition) {
if mtask.ctx.Err() != nil {
seelog.Infof("Managed task [%s]: unable to emit acs transition due to closed context: %v",
mtask.Arn, mtask.ctx.Err())
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to emit docker container change due to exit", mtask.Arn)
case mtask.acsMessages <- transition:
}
mtask.acsMessages <- transition
}

func (mtask *managedTask) isContainerFound(container *apicontainer.Container) bool {
Expand Down Expand Up @@ -1195,11 +1204,6 @@ func (mtask *managedTask) discardEvents() {
case <-mtask.acsMessages:
case <-mtask.resourceStateChangeEvent:
case <-mtask.ctx.Done():
// The task has been cancelled. No need to process any more
// events
close(mtask.dockerMessages)
close(mtask.acsMessages)
close(mtask.resourceStateChangeEvent)
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions agent/engine/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) {
stateChangeEvents: stateChangeEvents,
containerChangeEventStream: containerChangeEventStream,
dockerMessages: make(chan dockerContainerChange),
ctx: context.TODO(),
}

eventsGenerated := sync.WaitGroup{}
Expand Down Expand Up @@ -878,6 +879,7 @@ func TestOnContainersUnableToTransitionStateForDesiredStoppedTask(t *testing.T)
stateChangeEvents: stateChangeEvents,
},
stateChangeEvents: stateChangeEvents,
ctx: context.TODO(),
}
eventsGenerated := sync.WaitGroup{}
eventsGenerated.Add(1)
Expand Down Expand Up @@ -908,6 +910,7 @@ func TestOnContainersUnableToTransitionStateForDesiredRunningTask(t *testing.T)
},
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
},
ctx: context.TODO(),
}

task.handleContainersUnableToTransitionState()
Expand Down Expand Up @@ -1524,6 +1527,7 @@ func TestHandleContainerChangeUpdateContainerHealth(t *testing.T) {
Task: testdata.LoadTask("sleep5TaskCgroup"),
containerChangeEventStream: containerChangeEventStream,
stateChangeEvents: make(chan statechange.Event),
ctx: context.TODO(),
}
// Discard all the statechange events
defer discardEvents(mTask.stateChangeEvents)()
Expand Down Expand Up @@ -1565,6 +1569,7 @@ func TestHandleContainerChangeUpdateMetadataRedundant(t *testing.T) {
Task: testdata.LoadTask("sleep5TaskCgroup"),
containerChangeEventStream: containerChangeEventStream,
stateChangeEvents: make(chan statechange.Event),
ctx: context.TODO(),
}
// Discard all the statechange events
defer discardEvents(mTask.stateChangeEvents)()
Expand Down Expand Up @@ -1635,6 +1640,7 @@ func TestWaitForResourceTransition(t *testing.T) {
Task: &apitask.Task{
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
},
ctx: context.TODO(),
}
transition := make(chan struct{}, 1)
transitionChangeResource := make(chan string, 1)
Expand Down Expand Up @@ -1665,6 +1671,7 @@ func TestApplyResourceStateHappyPath(t *testing.T) {
Arn: "arn",
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
},
ctx: context.TODO(),
}
gomock.InOrder(
mockResource.EXPECT().GetName(),
Expand Down Expand Up @@ -1702,6 +1709,7 @@ func TestApplyResourceStateFailures(t *testing.T) {
Arn: "arn",
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
},
ctx: context.TODO(),
}
gomock.InOrder(
mockResource.EXPECT().GetName(),
Expand Down

0 comments on commit ab60658

Please sign in to comment.