Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disconnected clients: Add reconnect task event #12133

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ const (
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
TaskBuildingTaskDir = "Building Task Directory"
TaskClientReconnected = "Reconnected"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down
48 changes: 48 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,17 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
return ar.state.NetworkStatus.Copy()
}

// setIndexes is a helper for forcing a set of server side indexes
// on the alloc runner. This is used during reconnect when the task
// has been marked unknown by the server.
func (ar *allocRunner) setIndexes(update *structs.Allocation) {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
ar.alloc.AllocModifyIndex = update.AllocModifyIndex
ar.alloc.ModifyIndex = update.ModifyIndex
ar.alloc.ModifyTime = update.ModifyTime
}

// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
Expand Down Expand Up @@ -1233,6 +1244,43 @@ func (ar *allocRunner) Signal(taskName, signal string) error {
return err.ErrorOrNil()
}

// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server.
func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)

event := structs.NewTaskEvent(structs.TaskClientReconnected)
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}

// Update the client alloc with the server client side indexes.
ar.setIndexes(update)

// Calculate alloc state to get the final state with the new events.
// Cannot rely on AllocStates as it won't recompute TaskStates once they are set.
states := make(map[string]*structs.TaskState, len(ar.tasks))
for name, tr := range ar.tasks {
states[name] = tr.TaskState()
}

// Build the client allocation
alloc := ar.clientAlloc(states)

// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {
return
}
tgross marked this conversation as resolved.
Show resolved Hide resolved

// Update the server.
ar.stateUpdater.AllocStateUpdated(alloc)

// Broadcast client alloc to listeners.
err = ar.allocBroadcaster.Send(alloc)

return
}

func (ar *allocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler {
tr, ok := ar.tasks[taskName]
if !ok {
Expand Down
184 changes: 184 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package allocrunner

import (
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -1568,3 +1569,186 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ts)
}

func TestAllocRunner_Reconnect(t *testing.T) {
t.Parallel()

type tcase struct {
clientStatus string
taskState string
taskEvent *structs.TaskEvent
}
tcases := []tcase{
{
structs.AllocClientStatusRunning,
structs.TaskStateRunning,
structs.NewTaskEvent(structs.TaskStarted),
},
{
structs.AllocClientStatusComplete,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskTerminated),
},
{
structs.AllocClientStatusFailed,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskDriverFailure).SetFailsTask(),
},
{
structs.AllocClientStatusPending,
structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskReceived),
},
}

for _, tc := range tcases {
t.Run(tc.clientStatus, func(t *testing.T) {
// create a running alloc
alloc := mock.BatchAlloc()

// Ensure task takes some time
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)

go ar.Run()

for _, taskRunner := range ar.tasks {
taskRunner.UpdateState(tc.taskState, tc.taskEvent)
}

ar.Reconnect()

require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus)

found := false

updater := conf.StateUpdater.(*MockStateUpdater)
var last *structs.Allocation
testutil.WaitForResult(func() (bool, error) {
last = updater.Last()
if last == nil {
return false, errors.New("last update nil")
}

states := last.TaskStates
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
found = true
return true, nil
}
}
}

return false, errors.New("no reconnect event found")
}, func(err error) {
require.NoError(t, err)
})

require.True(t, found, "no reconnect event found")
})
}
}

func TestAllocRunner_MaybeHasPendingReconnect(t *testing.T) {
t.Parallel()

type tcase struct {
name string
timestamp int64
expectedDiff int
}
tcases := []tcase{
{
"should guard now",
time.Now().UnixNano(),
1,
},
{
"should guard 3 seconds",
time.Now().Add(-(3 * time.Second)).UnixNano(),
1,
},
{
"should not guard 6 seconds",
time.Now().Add(-(6 * time.Second)).UnixNano(),
2,
},
}

for _, tc := range tcases {
t.Run(tc.name, func(t *testing.T) {
alloc := mock.BatchAlloc()

task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)

go ar.Run()

reconnectEvent := structs.NewTaskEvent(structs.TaskClientReconnected)
reconnectEvent.Time = tc.timestamp
for _, tr := range ar.tasks {
tr.EmitEvent(reconnectEvent)
}

updater := conf.StateUpdater.(*MockStateUpdater)
// get a copy of the first states so that we can compare lengths to
// determine how many events were appended.
var firstStates map[string]*structs.TaskState
testutil.WaitForResult(func() (bool, error) {
last := updater.Last()
if last == nil {
return false, errors.New("last update nil")
}
states := last.TaskStates
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
firstStates = states
return true, nil
}
}
}

return false, errors.New("no reconnect event found")
}, func(err error) {
require.NoError(t, err)
})

ar.Reconnect()

testutil.WaitForResult(func() (bool, error) {
last := updater.Last()
if last == nil {
return false, errors.New("last update nil")
}

for k, taskState := range last.TaskStates {
if len(taskState.Events) != len(firstStates[k].Events)+tc.expectedDiff {
return false, fmt.Errorf("expected %d reconnect events", tc.expectedDiff)
}
}

return true, nil
}, func(err error) {
require.NoError(t, err)
})
})
}
}
5 changes: 5 additions & 0 deletions client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
m.mu.Unlock()
}

// PutAllocation satisfies the AllocStateHandler interface.
func (m *MockStateUpdater) PutAllocation(alloc *structs.Allocation) (err error) {
return
}

// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
// access with updates.
func (m *MockStateUpdater) Last() *structs.Allocation {
Expand Down
14 changes: 11 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type AllocRunner interface {

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
Reconnect(update *structs.Allocation) error

GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler
GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error)
Expand Down Expand Up @@ -1961,6 +1962,11 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
}
}

// PutAllocation stores an allocation or returns an error if it could not be stored.
func (c *Client) PutAllocation(alloc *structs.Allocation) error {
return c.stateDB.PutAllocation(alloc)
}

// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
Expand Down Expand Up @@ -2405,9 +2411,11 @@ func (c *Client) updateAlloc(update *structs.Allocation) {

// Reconnect unknown allocations
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
update.ClientStatus = ar.AllocState().ClientStatus
update.ClientDescription = ar.AllocState().ClientDescription
c.AllocStateUpdated(update)
tgross marked this conversation as resolved.
Show resolved Hide resolved
err = ar.Reconnect(update)
if err != nil {
c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "err", err)
}
return
}

// Update local copy of alloc
Expand Down
3 changes: 3 additions & 0 deletions client/interfaces/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type AllocStateHandler interface {
// AllocStateUpdated is used to emit an updated allocation. This allocation
// is stripped to only include client settable fields.
AllocStateUpdated(alloc *structs.Allocation)

// PutAllocation is used to persist an updated allocation in the local state store.
PutAllocation(*structs.Allocation) error
}

// DeviceStatsReporter gives access to the latest resource usage
Expand Down
2 changes: 2 additions & 0 deletions command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ func buildDisplayMessage(event *api.TaskEvent) string {
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskClientReconnected:
desc = "Client reconnected"
default:
desc = event.Message
}
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7871,6 +7871,9 @@ const (

// TaskPluginHealthy indicates that a plugin managed by Nomad became healthy
TaskPluginHealthy = "Plugin became healthy"

// TaskClientReconnected indicates that the client running the task disconnected.
TaskClientReconnected = "Reconnected"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down Expand Up @@ -8082,6 +8085,8 @@ func (e *TaskEvent) PopulateEventDisplayMessage() {
desc = "Leader Task in Group dead"
case TaskMainDead:
desc = "Main tasks in the group died"
case TaskClientReconnected:
desc = "Client reconnected"
default:
desc = e.Message
}
Expand Down
3 changes: 2 additions & 1 deletion nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5569,7 +5569,8 @@ func TestTaskEventPopulate(t *testing.T) {
{NewTaskEvent(TaskSignaling).SetTaskSignal(os.Interrupt).SetTaskSignalReason("process interrupted"), "Task being sent signal interrupt: process interrupted"},
{NewTaskEvent(TaskRestartSignal), "Task signaled to restart"},
{NewTaskEvent(TaskRestartSignal).SetRestartReason("Chaos Monkey restarted it"), "Chaos Monkey restarted it"},
{NewTaskEvent(TaskDriverMessage).SetDriverMessage("YOLO"), "YOLO"},
{NewTaskEvent(TaskClientReconnected), "Client reconnected"},
{NewTaskEvent(TaskLeaderDead), "Leader Task in Group dead"},
{NewTaskEvent("Unknown Type, No message"), ""},
{NewTaskEvent("Unknown Type").SetMessage("Hello world"), "Hello world"},
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}

// Handle reconnect updates
// Log reconnect updates. They will be pulled by the client when it reconnects.
for _, update := range results.reconnectUpdates {
s.ctx.Plan().AppendAlloc(update, nil)
s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
}

// Nothing remaining to do if placement is not required
Expand Down