Skip to content

Commit

Permalink
Add allocRunner.Reconnect function
Browse files Browse the repository at this point in the history
  • Loading branch information
DerekStrickland committed Feb 25, 2022
1 parent 72e3175 commit 5a01e9e
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 3 deletions.
78 changes: 78 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,84 @@ func (ar *allocRunner) Signal(taskName, signal string) error {
return err.ErrorOrNil()
}

// Reconnect synchronizes client state between an incoming unknown allocation and
// the current allocRunner and appends a reconnect event to each task if alloc is running.
func (ar *allocRunner) Reconnect(update *structs.Allocation) {
// Guard against the server sending update requests faster than the state
// gets propagated back to them.
if ar.hasPendingReconnect() {
ar.logger.Trace("skipping due to recent reconnect")
return
}

ar.maybeAppendReconnectEvent()

// Populate the alloc with updated events.
clientAlloc := ar.clientAlloc(ar.TaskRunnerTaskStates())

// Sync the incoming server state with the actual client state including the newly minted event.
update.ClientStatus = clientAlloc.ClientStatus
update.ClientDescription = clientAlloc.ClientDescription
update.TaskStates = clientAlloc.TaskStates

// Update the local copy of alloc.
if err := ar.stateDB.PutAllocation(update); err != nil {
ar.logger.Error("error reconnecting alloc when persisting updated alloc locally", "error", err, "alloc_id", update.ID)
return
}

// Sync the alloc runner. Important for ensuring the AllocModifyIndex is propagated.
// Do not call setAlloc on TaskRunners. That will restart the alloc.
ar.setAlloc(update)

// Update the server.
ar.stateUpdater.AllocStateUpdated(update)
}

// hasPendingReconnect checks the current alloc TaskRunnerTaskStates to see if a recent
// reconnect event was appended. This is useful for allowing the client state to
// have time to propagate to the servers before attempting another reconnect.
func (ar *allocRunner) hasPendingReconnect() bool {
now := time.Now()

for _, taskState := range ar.TaskRunnerTaskStates() {
for _, taskEvent := range taskState.Events {
if taskEvent.Type != structs.TaskClientReconnected {
continue
}

if now.Sub(time.Unix(0, taskEvent.Time)) < (5 * time.Second) {
return true
}
}
}

return false
}

// maybeAppendReconnectEvent appends a reconnect event if the computed client status is running.
func (ar *allocRunner) maybeAppendReconnectEvent() {
clientStatus, _ := getClientStatus(ar.TaskRunnerTaskStates())

if clientStatus != structs.AllocClientStatusRunning {
return
}

event := structs.NewTaskEvent(structs.TaskClientReconnected)
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}
}

// TaskRunnerTaskStates creates a map of task states by taskName.
func (ar *allocRunner) TaskRunnerTaskStates() map[string]*structs.TaskState {
states := make(map[string]*structs.TaskState, len(ar.tasks))
for taskName, tr := range ar.tasks {
states[taskName] = tr.TaskState()
}
return states
}

func (ar *allocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler {
tr, ok := ar.tasks[taskName]
if !ok {
Expand Down
217 changes: 217 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,3 +1568,220 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ts)
}

func TestAllocRunner_Reconnect(t *testing.T) {
t.Parallel()

type tcase struct {
clientStatus string
}
tcases := []tcase{
{
structs.AllocClientStatusRunning,
},
{
structs.AllocClientStatusComplete,
},
{
structs.AllocClientStatusFailed,
},
{
structs.AllocClientStatusPending,
},
}

for _, tc := range tcases {
t.Run(tc.clientStatus, func(t *testing.T) {
// create a running alloc
current := mock.BatchAlloc()

// Ensure task takes some time
task := current.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"

conf, cleanup := testAllocRunnerConfig(t, current)
defer cleanup()

ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)

go ar.Run()

updater := conf.StateUpdater.(*MockStateUpdater)
var last *structs.Allocation
testutil.WaitForResult(func() (result bool, err error) {
result = false
last = updater.Last()
if last == nil {
return
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return
}

result = true
return
}, func(err error) {
require.NoError(t, err)
})

// if we are trying to test a status other than running, update the status manually.
if tc.clientStatus != last.ClientStatus {
update := ar.alloc.Copy()
update.ClientStatus = tc.clientStatus

ar.Update(update)

testutil.WaitForResult(func() (result bool, err error) {
result = false
last = updater.Last()
if last == nil {
return
}

// Check the status is what we want.
if last.ClientStatus != tc.clientStatus {
return
}

result = true
return
}, func(e error) {
require.NoError(t, err, "update ClientStatus failed")
})
}

// create a reconnecting alloc from the server.
reconnecting := current.Copy()
reconnecting.ClientStatus = structs.AllocClientStatusUnknown
reconnecting.AllocModifyIndex = current.AllocModifyIndex + 10

// If testing failed we have to create an event that results in the task failing.
if tc.clientStatus == structs.AllocClientStatusFailed {
for _, taskRunner := range ar.tasks {
taskRunner.AppendEvent(structs.NewTaskEvent(structs.TaskStateDead).SetFailsTask())
}
}

// if testing pending we manipulate the state so that it appears the alloc never ran.
if tc.clientStatus == structs.AllocClientStatusPending {
for _, taskRunner := range ar.tasks {
taskRunner.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskStatePending))
}
}

ar.Reconnect(reconnecting)

require.Equal(t, tc.clientStatus, ar.alloc.ClientStatus)
require.Equal(t, ar.alloc.AllocModifyIndex, reconnecting.AllocModifyIndex)

// Make sure events were or were not appended
if tc.clientStatus == structs.AllocClientStatusRunning {
found := false
states := ar.TaskRunnerTaskStates()
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
found = true
break
}
}
}
require.True(t, found, "no reconnect event found")
} else {
states := ar.TaskRunnerTaskStates()
for _, s := range states {
for _, e := range s.Events {
require.NotEqual(t, structs.TaskClientReconnected, e.Type, "found invalid reconnect event")
}
}
}
})
}
}

func TestAllocRunner_MaybeHasPendingReconnect(t *testing.T) {
t.Parallel()

type tcase struct {
name string
timestamp int64
expected bool
}
tcases := []tcase{
{
"no event",
0,
false,
},
{
"should guard now",
time.Now().UnixNano(),
true,
},
{
"should guard 3 seconds",
time.Now().Add(-(3 * time.Second)).UnixNano(),
true,
},
{
"should not guard 5 seconds",
time.Now().Add(-(6 * time.Second)).UnixNano(),
false,
},
}

for _, tc := range tcases {
t.Run(tc.name, func(t *testing.T) {
// create a running alloc
current := mock.BatchAlloc()

// Ensure task takes some time
task := current.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"

conf, cleanup := testAllocRunnerConfig(t, current)
defer cleanup()

ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)

go ar.Run()

updater := conf.StateUpdater.(*MockStateUpdater)
var last *structs.Allocation
testutil.WaitForResult(func() (result bool, err error) {
result = false
last = updater.Last()
if last == nil {
return
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return
}

result = true
return
}, func(err error) {
require.NoError(t, err)
})

// Append the reconnect task event with the timestamp
if tc.timestamp > 0 {
event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = tc.timestamp
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}
}

require.Equal(t, tc.expected, ar.hasPendingReconnect())
})
}
}
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type AllocRunner interface {

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
Reconnect(update *structs.Allocation)

GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler
GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error)
Expand Down Expand Up @@ -2405,9 +2406,8 @@ func (c *Client) updateAlloc(update *structs.Allocation) {

// Reconnect unknown allocations
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
update.ClientStatus = ar.AllocState().ClientStatus
update.ClientDescription = ar.AllocState().ClientDescription
c.AllocStateUpdated(update)
ar.Reconnect(update)
return
}

// Update local copy of alloc
Expand Down

0 comments on commit 5a01e9e

Please sign in to comment.