Skip to content

Commit

Permalink
Make task wait for resources on agent restart
Browse files Browse the repository at this point in the history
Initially agent should wait for instance resource(port, mem) to be
released by waiting for previous task to be stopped, but this will
broken when task is waiting and agent was restarted. This fix will add
task into waitgroup on restart.
  • Loading branch information
Peng Yin committed Nov 8, 2017
1 parent f745c7f commit 73a4ea2
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 37 deletions.
16 changes: 16 additions & 0 deletions agent/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,22 @@ func (task *Task) GetTaskENI() *ENI {
return task.ENI
}

// GetStopSequenceNumber returns the stop sequence number of a task
func (task *Task) GetStopSequenceNumber() int64 {
task.desiredStatusLock.RLock()
defer task.desiredStatusLock.RUnlock()

return task.StopSequenceNumber
}

// SetStopSequenceNumber sets the stop seqence number of a task
func (task *Task) SetStopSequenceNumber(seqnum int64) {
task.desiredStatusLock.Lock()
defer task.desiredStatusLock.Unlock()

task.StopSequenceNumber = seqnum
}

// String returns a human readable string representation of this object
func (task *Task) String() string {
res := fmt.Sprintf("%s:%s %s, TaskStatus: (%s->%s)",
Expand Down
90 changes: 55 additions & 35 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,54 +245,74 @@ func (engine *DockerTaskEngine) synchronizeState() {
}

tasks := engine.state.AllTasks()
var tasksToStart []*api.Task
for _, task := range tasks {
conts, ok := engine.state.ContainerMapByArn(task.Arn)
if !ok {
engine.startTask(task)
// task hasn't started processing, no need to check container status
tasksToStart = append(tasksToStart, task)
continue
}

for _, cont := range conts {
if cont.DockerID == "" {
log.Debug("Found container potentially created while we were down", "name", cont.DockerName)
// Figure out the dockerid
describedCont, err := engine.client.InspectContainer(cont.DockerName, inspectContainerTimeout)
if err != nil {
log.Warn("Could not find matching container for expected", "name", cont.DockerName)
} else {
cont.DockerID = describedCont.ID
cont.Container.SetKnownStatus(dockerStateToState(describedCont.State))
// update mappings that need dockerid
engine.state.AddContainer(cont, task)
engine.imageManager.RecordContainerReference(cont.Container)
}
}
if cont.DockerID != "" {
currentState, metadata := engine.client.DescribeContainer(cont.DockerID)
if metadata.Error != nil {
currentState = api.ContainerStopped
if !cont.Container.KnownTerminal() {
cont.Container.ApplyingError = api.NewNamedError(&ContainerVanishedError{})
log.Warn("Could not describe previously known container; assuming dead", "err", metadata.Error, "id", cont.DockerID, "name", cont.DockerName)
engine.imageManager.RemoveContainerReferenceFromImageState(cont.Container)
}
} else {
engine.imageManager.RecordContainerReference(cont.Container)
if engine.cfg.ContainerMetadataEnabled && !cont.Container.IsMetadataFileUpdated() {
go engine.updateMetadataFile(task, cont)
}

}
if currentState > cont.Container.GetKnownStatus() {
cont.Container.SetKnownStatus(currentState)
}
}
engine.synchronizeContainerStatus(cont, task)
}

tasksToStart = append(tasksToStart, task)

// Put tasks that are stopped by acs but hasn't been stopped in wait group
if task.GetDesiredStatus().Terminal() && task.GetStopSequenceNumber() != 0 {
engine.taskStopGroup.Add(task.GetStopSequenceNumber(), 1)
}
}

for _, task := range tasksToStart {
engine.startTask(task)
}

engine.saver.Save()
}

// synchronizeContainerStatus checks and updates the container status with docker
func (engine *DockerTaskEngine) synchronizeContainerStatus(container *api.DockerContainer, task *api.Task) {
if container.DockerID == "" {
log.Debug("Found container potentially created while we were down", "name", container.DockerName)
// Figure out the dockerid
describedContainer, err := engine.client.InspectContainer(container.DockerName, inspectContainerTimeout)
if err != nil {
log.Warn("Could not find matching container for expected", "name", container.DockerName)
} else {
container.DockerID = describedContainer.ID
container.Container.SetKnownStatus(dockerStateToState(describedContainer.State))
// update mappings that need dockerid
engine.state.AddContainer(container, task)
engine.imageManager.RecordContainerReference(container.Container)
}
return
}

if container.DockerID != "" {
currentState, metadata := engine.client.DescribeContainer(container.DockerID)
if metadata.Error != nil {
currentState = api.ContainerStopped
if !container.Container.KnownTerminal() {
container.Container.ApplyingError = api.NewNamedError(&ContainerVanishedError{})
log.Warn("Could not describe previously known container; assuming dead", "err", metadata.Error, "id", container.DockerID, "name", container.DockerName)
engine.imageManager.RemoveContainerReferenceFromImageState(container.Container)
}
} else {
engine.imageManager.RecordContainerReference(container.Container)
if engine.cfg.ContainerMetadataEnabled && !container.Container.IsMetadataFileUpdated() {
go engine.updateMetadataFile(task, container)
}
}
if currentState > container.Container.GetKnownStatus() {
// update the container known status
container.Container.SetKnownStatus(currentState)
}
}
}

// CheckTaskState inspects the state of all containers within a task and writes
// their state to the managed task's container channel.
func (engine *DockerTaskEngine) CheckTaskState(task *api.Task) {
Expand Down
76 changes: 76 additions & 0 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2031,3 +2031,79 @@ func TestNewTaskTransitionOnRestart(t *testing.T) {
_, ok := dockerTaskEngine.managedTasks[testTask.Arn]
assert.True(t, ok, "task wasnot started")
}

// TestTaskWaitForHostResourceOnRestart tests task stopped by acs but hasn't
// reached stopped should block the later task to start
func TestTaskWaitForHostResourceOnRestart(t *testing.T) {
// Task stopped by backend
taskStoppedByACS := testdata.LoadTask("sleep5")
taskStoppedByACS.SetDesiredStatus(api.TaskStopped)
taskStoppedByACS.SetStopSequenceNumber(1)
taskStoppedByACS.SetKnownStatus(api.TaskRunning)

// Task has essential container stopped
taskEssentialContainerStopped := testdata.LoadTask("sleep5")
taskEssentialContainerStopped.Arn = "task_Essential_Container_Stopped"
taskEssentialContainerStopped.SetDesiredStatus(api.TaskStopped)
taskEssentialContainerStopped.SetKnownStatus(api.TaskRunning)

// Normal task needs to be started
taskNotStarted := testdata.LoadTask("sleep5")
taskNotStarted.Arn = "task_Not_started"

conf := &defaultConfig
conf.ContainerMetadataEnabled = false
ctrl, client, _, privateTaskEngine, _, imageManager, _ := mocks(t, conf)
saver := mock_statemanager.NewMockStateManager(ctrl)
defer ctrl.Finish()

taskEngine := privateTaskEngine.(*DockerTaskEngine)
taskEngine.saver = saver

taskEngine.State().AddTask(taskStoppedByACS)
taskEngine.State().AddTask(taskNotStarted)
taskEngine.State().AddTask(taskEssentialContainerStopped)

taskEngine.State().AddContainer(&api.DockerContainer{
Container: taskStoppedByACS.Containers[0],
DockerID: containerID + "1",
DockerName: dockerContainerName + "1",
}, taskStoppedByACS)
taskEngine.State().AddContainer(&api.DockerContainer{
Container: taskNotStarted.Containers[0],
DockerID: containerID + "2",
DockerName: dockerContainerName + "2",
}, taskNotStarted)
taskEngine.State().AddContainer(&api.DockerContainer{
Container: taskEssentialContainerStopped.Containers[0],
DockerID: containerID + "3",
DockerName: dockerContainerName + "3",
}, taskEssentialContainerStopped)

// these are performed in synchronizeState on restart
client.EXPECT().DescribeContainer(gomock.Any()).Return(api.ContainerRunning, DockerContainerMetadata{
DockerID: containerID,
}).Times(3)
imageManager.EXPECT().RecordContainerReference(gomock.Any()).Times(3)

saver.EXPECT().Save()
// start the two tasks
taskEngine.synchronizeState()

waitStopDone := make(chan struct{})
go func() {
// This is to confirm the other task is waiting
time.Sleep(1 * time.Second)
// Remove the task sequence number 1 from waitgroup
taskEngine.taskStopGroup.Done(1)
waitStopDone <- struct{}{}
}()

// task with sequence number 2 should wait until 1 is removed from the waitgroup
taskEngine.taskStopGroup.Wait(2)
select {
case <-waitStopDone:
default:
t.Errorf("task should wait for tasks in taskStopGroup")
}
}
4 changes: 2 additions & 2 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ func (mtask *managedTask) handleDesiredStatusChange(desiredStatus api.TaskStatus
llog.Debug("Redundant task transition; ignoring", "old", mtask.GetDesiredStatus().String(), "new", desiredStatus.String())
return
}
if desiredStatus == api.TaskStopped && seqnum != 0 && mtask.StopSequenceNumber == 0 {
if desiredStatus == api.TaskStopped && seqnum != 0 && mtask.GetStopSequenceNumber() == 0 {
llog.Debug("Task moving to stopped, adding to stopgroup", "seqnum", seqnum)
mtask.StopSequenceNumber = seqnum
mtask.SetStopSequenceNumber(seqnum)
mtask.engine.taskStopGroup.Add(seqnum, 1)
}
mtask.SetDesiredStatus(desiredStatus)
Expand Down

0 comments on commit 73a4ea2

Please sign in to comment.