Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docker task manager: dont send to closed channels #2461

Merged
merged 1 commit into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,7 @@ func (engine *DockerTaskEngine) handleDockerEvent(event dockerapi.DockerContaine

engine.tasksLock.RLock()
managedTask, ok := engine.managedTasks[task.Arn]
// hold the lock until the message is sent so we don't send on a closed channel
defer engine.tasksLock.RUnlock()
engine.tasksLock.RUnlock()
if !ok {
seelog.Criticalf("Task engine: could not find managed task [%s] corresponding to a docker event: %s",
task.Arn, event.String())
Expand Down
46 changes: 25 additions & 21 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,11 @@ func (mtask *managedTask) handleResourceStateChange(resChange resourceStateChang
}

func (mtask *managedTask) emitResourceChange(change resourceStateChange) {
if mtask.ctx.Err() != nil {
seelog.Infof("Managed task [%s]: unable to emit resource state change due to closed context: %v",
mtask.Arn, mtask.ctx.Err())
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to emit resource state change due to exit", mtask.Arn)
case mtask.resourceStateChangeEvent <- change:
}
mtask.resourceStateChangeEvent <- change
}

func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
Expand All @@ -513,7 +513,11 @@ func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
return
}
seelog.Infof("Managed task [%s]: sending task change event [%s]", mtask.Arn, event.String())
mtask.stateChangeEvents <- event
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to send task change event [%s] due to exit", mtask.Arn, event.String())
case mtask.stateChangeEvents <- event:
}
seelog.Infof("Managed task [%s]: sent task change event [%s]", mtask.Arn, event.String())
}

Expand All @@ -527,27 +531,32 @@ func (mtask *managedTask) emitContainerEvent(task *apitask.Task, cont *apicontai
return
}

seelog.Infof("Managed task [%s]: sending container change event [%s]: %s",
seelog.Infof("Managed task [%s]: Container [%s]: sending container change event: %s",
mtask.Arn, cont.Name, event.String())
mtask.stateChangeEvents <- event
seelog.Infof("Managed task [%s]: sent container change event [%s]: %s",
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: Container [%s]: unable to send container change event [%s] due to exit",
mtask.Arn, cont.Name, event.String())
case mtask.stateChangeEvents <- event:
}
seelog.Infof("Managed task [%s]: Container [%s]: sent container change event: %s",
mtask.Arn, cont.Name, event.String())
}

func (mtask *managedTask) emitDockerContainerChange(change dockerContainerChange) {
if mtask.ctx.Err() != nil {
seelog.Infof("Managed task [%s]: unable to emit docker container change due to closed context: %v",
mtask.Arn, mtask.ctx.Err())
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to emit docker container change due to exit", mtask.Arn)
case mtask.dockerMessages <- change:
}
mtask.dockerMessages <- change
}

func (mtask *managedTask) emitACSTransition(transition acsTransition) {
if mtask.ctx.Err() != nil {
seelog.Infof("Managed task [%s]: unable to emit acs transition due to closed context: %v",
mtask.Arn, mtask.ctx.Err())
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: unable to emit docker container change due to exit", mtask.Arn)
case mtask.acsMessages <- transition:
}
mtask.acsMessages <- transition
}

func (mtask *managedTask) isContainerFound(container *apicontainer.Container) bool {
Expand Down Expand Up @@ -1195,11 +1204,6 @@ func (mtask *managedTask) discardEvents() {
case <-mtask.acsMessages:
case <-mtask.resourceStateChangeEvent:
case <-mtask.ctx.Done():
// The task has been cancelled. No need to process any more
// events
close(mtask.dockerMessages)
close(mtask.acsMessages)
close(mtask.resourceStateChangeEvent)
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions agent/engine/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) {
stateChangeEvents: stateChangeEvents,
containerChangeEventStream: containerChangeEventStream,
dockerMessages: make(chan dockerContainerChange),
ctx: context.TODO(),
}

eventsGenerated := sync.WaitGroup{}
Expand Down Expand Up @@ -878,6 +879,7 @@ func TestOnContainersUnableToTransitionStateForDesiredStoppedTask(t *testing.T)
stateChangeEvents: stateChangeEvents,
},
stateChangeEvents: stateChangeEvents,
ctx: context.TODO(),
}
eventsGenerated := sync.WaitGroup{}
eventsGenerated.Add(1)
Expand Down Expand Up @@ -908,6 +910,7 @@ func TestOnContainersUnableToTransitionStateForDesiredRunningTask(t *testing.T)
},
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
},
ctx: context.TODO(),
}

task.handleContainersUnableToTransitionState()
Expand Down Expand Up @@ -1524,6 +1527,7 @@ func TestHandleContainerChangeUpdateContainerHealth(t *testing.T) {
Task: testdata.LoadTask("sleep5TaskCgroup"),
containerChangeEventStream: containerChangeEventStream,
stateChangeEvents: make(chan statechange.Event),
ctx: context.TODO(),
}
// Discard all the statechange events
defer discardEvents(mTask.stateChangeEvents)()
Expand Down Expand Up @@ -1565,6 +1569,7 @@ func TestHandleContainerChangeUpdateMetadataRedundant(t *testing.T) {
Task: testdata.LoadTask("sleep5TaskCgroup"),
containerChangeEventStream: containerChangeEventStream,
stateChangeEvents: make(chan statechange.Event),
ctx: context.TODO(),
}
// Discard all the statechange events
defer discardEvents(mTask.stateChangeEvents)()
Expand Down Expand Up @@ -1635,6 +1640,7 @@ func TestWaitForResourceTransition(t *testing.T) {
Task: &apitask.Task{
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
},
ctx: context.TODO(),
}
transition := make(chan struct{}, 1)
transitionChangeResource := make(chan string, 1)
Expand Down Expand Up @@ -1665,6 +1671,7 @@ func TestApplyResourceStateHappyPath(t *testing.T) {
Arn: "arn",
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
},
ctx: context.TODO(),
}
gomock.InOrder(
mockResource.EXPECT().GetName(),
Expand Down Expand Up @@ -1702,6 +1709,7 @@ func TestApplyResourceStateFailures(t *testing.T) {
Arn: "arn",
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
},
ctx: context.TODO(),
}
gomock.InOrder(
mockResource.EXPECT().GetName(),
Expand Down