Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocrunner: refactor task coordinator #14009

Merged
merged 12 commits into from
Aug 22, 2022
92 changes: 51 additions & 41 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ type allocRunner struct {
// restore.
serversContactedCh chan struct{}

taskHookCoordinator *taskHookCoordinator
// taskCoordinator is used to controlled when tasks are allowed to run
// depending on their lifecycle configuration.
taskCoordinator *taskCoordinator

shutdownDelayCtx context.Context
shutdownDelayCancelFn context.CancelFunc
Expand Down Expand Up @@ -247,7 +249,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
// Create alloc dir
ar.allocDir = allocdir.NewAllocDir(ar.logger, config.ClientConfig.AllocDir, alloc.ID)

ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks)
ar.taskCoordinator = newTaskCoordinator(ar.logger, tg.Tasks, ar.waitCh)

shutdownDelayCtx, shutdownDelayCancel := context.WithCancel(context.Background())
ar.shutdownDelayCtx = shutdownDelayCtx
Expand All @@ -270,27 +272,27 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
for _, task := range tasks {
trConfig := &taskrunner.Config{
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
DynamicRegistry: ar.dynamicRegistry,
Consul: ar.consulClient,
ConsulProxies: ar.consulProxiesClient,
ConsulSI: ar.sidsClient,
Vault: ar.vaultClient,
DeviceStatsReporter: ar.deviceStatsReporter,
CSIManager: ar.csiManager,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
Getter: ar.getter,
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
DynamicRegistry: ar.dynamicRegistry,
Consul: ar.consulClient,
ConsulProxies: ar.consulProxiesClient,
ConsulSI: ar.sidsClient,
Vault: ar.vaultClient,
DeviceStatsReporter: ar.deviceStatsReporter,
CSIManager: ar.csiManager,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCh: ar.taskCoordinator.startConditionForTask(task),
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
Getter: ar.getter,
}

if ar.cpusetManager != nil {
Expand Down Expand Up @@ -388,26 +390,12 @@ func (ar *allocRunner) shouldRun() bool {

// runTasks is used to run the task runners and block until they exit.
func (ar *allocRunner) runTasks() {
// Start all tasks
// Start and wait for all tasks.
for _, task := range ar.tasks {
go task.Run()
}

// Block on all tasks except poststop tasks
for _, task := range ar.tasks {
if !task.IsPoststopTask() {
<-task.WaitCh()
}
}

// Signal poststop tasks to proceed to main runtime
ar.taskHookCoordinator.StartPoststopTasks()

// Wait for poststop tasks to finish before proceeding
for _, task := range ar.tasks {
if task.IsPoststopTask() {
<-task.WaitCh()
}
<-task.WaitCh()
Comment on lines +394 to +399
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section immediately shows the value of this approach 👍

}
}

Expand Down Expand Up @@ -461,7 +449,7 @@ func (ar *allocRunner) Restore() error {
states[tr.Task().Name] = tr.TaskState()
}

ar.taskHookCoordinator.taskStateUpdated(states)
ar.taskCoordinator.restore(states)

return nil
}
Expand Down Expand Up @@ -596,7 +584,7 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}

ar.taskHookCoordinator.taskStateUpdated(states)
ar.taskCoordinator.taskStateUpdated(states)

// Get the client allocation
calloc := ar.clientAlloc(states)
Expand All @@ -609,6 +597,28 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
if !tr.IsSidecarTask() {
return true
}
}

return false
}

// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
if tr.IsSidecarTask() {
return true
}
}

return false
}

// killTasks kills all task runners, leader (if there is one) first. Errors are
// logged except taskrunner.ErrTaskNotRunning which is ignored. Task states
// after Kill has been called are returned.
Expand Down
138 changes: 125 additions & 13 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allochealth"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
Expand Down Expand Up @@ -803,28 +804,52 @@ func TestAllocRunner_Restore_LifecycleHooks(t *testing.T) {
ar, err := NewAllocRunner(conf)
require.NoError(t, err)

// We should see all tasks with Prestart hooks are not blocked from running:
// i.e. the "init" and "side" task hook coordinator channels are closed
require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["init"].Task())), "init channel was open, should be closed")
require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task())), "side channel was open, should be closed")
go ar.Run()
defer destroy(ar)

isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task()))
// Wait for the coordinator to transition from the "init" state.
testutil.WaitForResultUntil(time.Second,
func() (bool, error) {
return ar.taskCoordinator.currentState != taskCoordinatorStateInit, nil
},
func(err error) {
t.Fatalf("task coordinator didn't transition in time")
})

// Mimic client dies while init task running, and client restarts after init task finished
// We should see all tasks with Prestart hooks are not blocked from running.
requireTaskAllowed(t, ar.taskCoordinator, ar.tasks["init"].Task())
requireTaskAllowed(t, ar.taskCoordinator, ar.tasks["side"].Task())
requireTaskBlocked(t, ar.taskCoordinator, ar.tasks["web"].Task())
requireTaskBlocked(t, ar.taskCoordinator, ar.tasks["poststart"].Task())

// Mimic client dies while init task running, and client restarts after
// init task finished and web is running.
ar.tasks["init"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskTerminated))
ar.tasks["side"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
ar.tasks["web"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))

// Create a new AllocRunner to test RestoreState and Run
// Create a new AllocRunner to test Restore and Run.
ar2, err := NewAllocRunner(conf)
require.NoError(t, err)
require.NoError(t, ar2.Restore())

if err := ar2.Restore(); err != nil {
t.Fatalf("error restoring state: %v", err)
}
go ar2.Run()
defer destroy(ar2)

// Wait for the coordinator to transition from the "init" state.
testutil.WaitForResultUntil(time.Second,
func() (bool, error) {
return ar2.taskCoordinator.currentState != taskCoordinatorStateInit, nil
},
func(err error) {
t.Fatalf("task coordinator didn't transition in time")
})

// We want to see Restore resume execution with correct hook ordering:
// i.e. we should see the "web" main task hook coordinator channel is closed
require.Truef(t, isChannelClosed(ar2.taskHookCoordinator.startConditionForTask(ar.tasks["web"].Task())), "web channel was open, should be closed")
// Restore resumes execution with correct lifecycle ordering.
requireTaskBlocked(t, ar2.taskCoordinator, ar2.tasks["init"].Task())
requireTaskAllowed(t, ar2.taskCoordinator, ar2.tasks["side"].Task())
requireTaskAllowed(t, ar2.taskCoordinator, ar2.tasks["web"].Task())
requireTaskAllowed(t, ar2.taskCoordinator, ar2.tasks["poststart"].Task())
}

func TestAllocRunner_Update_Semantics(t *testing.T) {
Expand Down Expand Up @@ -1811,3 +1836,90 @@ func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) {
last = upd.Last()
require.Less(t, last.TaskStates[sidecarTask.Name].FinishedAt, last.TaskStates[poststopTask.Name].FinishedAt)
}

func TestHasSidecarTasks(t *testing.T) {
ci.Parallel(t)

falseV, trueV := false, true

cases := []struct {
name string
// nil if main task, false if non-sidecar hook, true if sidecar hook
indicators []*bool
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

hasSidecars bool
hasNonsidecars bool
}{
{
name: "all sidecar - one",
indicators: []*bool{&trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "all sidecar - multiple",
indicators: []*bool{&trueV, &trueV, &trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "some sidecars, some others",
indicators: []*bool{nil, &falseV, &trueV},
hasSidecars: true,
hasNonsidecars: true,
},
{
name: "no sidecars",
indicators: []*bool{nil, &falseV, nil},
hasSidecars: false,
hasNonsidecars: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
alloc := allocWithSidecarIndicators(c.indicators)
arConf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

ar, err := NewAllocRunner(arConf)
require.NoError(t, err)

require.Equal(t, c.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars")

runners := []*taskrunner.TaskRunner{}
for _, r := range ar.tasks {
runners = append(runners, r)
}
require.Equal(t, c.hasNonsidecars, hasNonSidecarTasks(runners), "non-sidecars")

})
}
}

func allocWithSidecarIndicators(indicators []*bool) *structs.Allocation {
alloc := mock.BatchAlloc()

tasks := []*structs.Task{}
resources := map[string]*structs.AllocatedTaskResources{}

tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]

for i, indicator := range indicators {
task := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task.Name = fmt.Sprintf("task%d", i)
if indicator != nil {
task.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: *indicator,
}
}
tasks = append(tasks, task)
resources[task.Name] = tr
}

alloc.Job.TaskGroups[0].Tasks = tasks

alloc.AllocatedResources.Tasks = resources
return alloc
}
Loading