Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disconnected clients: Feature branch merge #12476

Merged
merged 17 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ type TaskGroupSummary struct {
Running int
Starting int
Lost int
Unknown int
}

// JobListStub is used to return a subset of information about
Expand Down
2 changes: 2 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ type TaskGroup struct {
Services []*Service `hcl:"service,block"`
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect" hcl:"stop_after_client_disconnect,optional"`
MaxClientDisconnect *time.Duration `mapstructure:"max_client_disconnect" hcl:"max_client_disconnect,optional"`
Scaling *ScalingPolicy `hcl:"scaling,block"`
Consul *Consul `hcl:"consul,block"`
}
Expand Down Expand Up @@ -971,6 +972,7 @@ const (
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
TaskBuildingTaskDir = "Building Task Directory"
TaskClientReconnected = "Reconnected"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down
46 changes: 46 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,16 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
return ar.state.NetworkStatus.Copy()
}

// setIndexes is a helper for forcing alloc state on the alloc runner. This is
// used during reconnect when the task has been marked unknown by the server.
func (ar *allocRunner) setIndexes(update *structs.Allocation) {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
ar.alloc.AllocModifyIndex = update.AllocModifyIndex
ar.alloc.ModifyIndex = update.ModifyIndex
ar.alloc.ModifyTime = update.ModifyTime
}

// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
Expand Down Expand Up @@ -1240,6 +1250,42 @@ func (ar *allocRunner) Signal(taskName, signal string) error {
return err.ErrorOrNil()
}

// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server.
func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = time.Now().UnixNano()
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}

// Update the client alloc with the server side indexes.
ar.setIndexes(update)

// Calculate alloc state to get the final state with the new events.
// Cannot rely on AllocStates as it won't recompute TaskStates once they are set.
states := make(map[string]*structs.TaskState, len(ar.tasks))
for name, tr := range ar.tasks {
states[name] = tr.TaskState()
}

// Build the client allocation
alloc := ar.clientAlloc(states)

// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {
return
}

// Update the server.
ar.stateUpdater.AllocStateUpdated(alloc)

// Broadcast client alloc to listeners.
err = ar.allocBroadcaster.Send(alloc)

return
}

func (ar *allocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler {
tr, ok := ar.tasks[taskName]
if !ok {
Expand Down
108 changes: 108 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package allocrunner

import (
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -1575,3 +1576,110 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ts)
}

func TestAllocRunner_Reconnect(t *testing.T) {
t.Parallel()

type tcase struct {
clientStatus string
taskState string
taskEvent *structs.TaskEvent
}
tcases := []tcase{
{
structs.AllocClientStatusRunning,
structs.TaskStateRunning,
structs.NewTaskEvent(structs.TaskStarted),
},
{
structs.AllocClientStatusComplete,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskTerminated),
},
{
structs.AllocClientStatusFailed,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskDriverFailure).SetFailsTask(),
},
{
structs.AllocClientStatusPending,
structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskReceived),
},
}

for _, tc := range tcases {
t.Run(tc.clientStatus, func(t *testing.T) {
// create a running alloc
alloc := mock.BatchAlloc()
alloc.AllocModifyIndex = 10
alloc.ModifyIndex = 10
alloc.ModifyTime = time.Now().UnixNano()

// Ensure task takes some time
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"

original := alloc.Copy()

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)

go ar.Run()

for _, taskRunner := range ar.tasks {
taskRunner.UpdateState(tc.taskState, tc.taskEvent)
}

update := ar.Alloc().Copy()

update.ClientStatus = structs.AllocClientStatusUnknown
update.AllocModifyIndex = original.AllocModifyIndex + 10
update.ModifyIndex = original.ModifyIndex + 10
update.ModifyTime = original.ModifyTime + 10

err = ar.Reconnect(update)
require.NoError(t, err)

require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus)


// Make sure the runner's alloc indexes match the update.
require.Equal(t, update.AllocModifyIndex, ar.Alloc().AllocModifyIndex)
require.Equal(t, update.ModifyIndex, ar.Alloc().ModifyIndex)
require.Equal(t, update.ModifyTime, ar.Alloc().ModifyTime)

found := false

updater := conf.StateUpdater.(*MockStateUpdater)
var last *structs.Allocation
testutil.WaitForResult(func() (bool, error) {
last = updater.Last()
if last == nil {
return false, errors.New("last update nil")
}

states := last.TaskStates
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
found = true
return true, nil
}
}
}

return false, errors.New("no reconnect event found")
}, func(err error) {
require.NoError(t, err)
})

require.True(t, found, "no reconnect event found")
})
}
}
5 changes: 5 additions & 0 deletions client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
m.mu.Unlock()
}

// PutAllocation satisfies the AllocStateHandler interface.
func (m *MockStateUpdater) PutAllocation(alloc *structs.Allocation) (err error) {
return
}

// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
// access with updates.
func (m *MockStateUpdater) Last() *structs.Allocation {
Expand Down
3 changes: 1 addition & 2 deletions client/allocwatcher/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
p.logger.Debug("blocking alloc was GC'd")
return nil
}
if resp.Alloc.Terminated() {
// Terminated!
if resp.Alloc.Terminated() || resp.Alloc.ClientStatus == structs.AllocClientStatusUnknown {
p.nodeID = resp.Alloc.NodeID
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type AllocRunner interface {

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
Reconnect(update *structs.Allocation) error

GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler
GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error)
Expand Down Expand Up @@ -1978,6 +1979,11 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
}
}

// PutAllocation stores an allocation or returns an error if it could not be stored.
func (c *Client) PutAllocation(alloc *structs.Allocation) error {
return c.stateDB.PutAllocation(alloc)
}

// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
Expand Down Expand Up @@ -2420,6 +2426,15 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
return
}

// Reconnect unknown allocations
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
err = ar.Reconnect(update)
if err != nil {
c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "err", err)
}
return
}

// Update local copy of alloc
if err := c.stateDB.PutAllocation(update); err != nil {
c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID)
Expand Down
86 changes: 86 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,3 +1713,89 @@ func Test_verifiedTasks(t *testing.T) {
try(t, alloc(tgTasks), tasks, tasks, "")
})
}

func TestClient_ReconnectAllocs(t *testing.T) {
t.Parallel()

s1, _, cleanupS1 := testServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer cleanupC1()

waitTilNodeReady(c1, t)

job := mock.Job()

runningAlloc := mock.Alloc()
runningAlloc.NodeID = c1.Node().ID
runningAlloc.Job = job
runningAlloc.JobID = job.ID
runningAlloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
runningAlloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
runningAlloc.ClientStatus = structs.AllocClientStatusPending

state := s1.State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
require.NoError(t, err)

err = state.UpsertJobSummary(101, mock.JobSummary(runningAlloc.JobID))
require.NoError(t, err)

err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{runningAlloc})
require.NoError(t, err)

// Ensure allocation gets upserted with desired status.
testutil.WaitForResult(func() (bool, error) {
upsertResult, stateErr := state.AllocByID(nil, runningAlloc.ID)
return upsertResult.ClientStatus == structs.AllocClientStatusRunning, stateErr
}, func(err error) {
require.NoError(t, err, "allocation query failed")
})

// Create the unknown version of the alloc from the running one, update state
// to simulate what reconciler would have done, and then send to the client.
unknownAlloc, err := state.AllocByID(nil, runningAlloc.ID)
require.Equal(t, structs.AllocClientStatusRunning, unknownAlloc.ClientStatus)
require.NoError(t, err)
unknownAlloc.ClientStatus = structs.AllocClientStatusUnknown
unknownAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
err = state.UpsertAllocs(structs.MsgTypeTestSetup, runningAlloc.AllocModifyIndex+1, []*structs.Allocation{unknownAlloc})
require.NoError(t, err)

updates := &allocUpdates{
pulled: map[string]*structs.Allocation{
unknownAlloc.ID: unknownAlloc,
},
}

c1.runAllocs(updates)

invalid := false
var runner AllocRunner
var finalAlloc *structs.Allocation
// Ensure the allocation is not invalid on the client and has been marked
// running on the server with the new modify index
testutil.WaitForResult(func() (result bool, stateErr error) {
c1.allocLock.RLock()
runner = c1.allocs[unknownAlloc.ID]
_, invalid = c1.invalidAllocs[unknownAlloc.ID]
c1.allocLock.RUnlock()

finalAlloc, stateErr = state.AllocByID(nil, unknownAlloc.ID)
result = structs.AllocClientStatusRunning == finalAlloc.ClientStatus
return
}, func(err error) {
require.NoError(t, err, "allocation server check failed")
})

require.NotNil(t, runner, "expected alloc runner")
require.False(t, invalid, "expected alloc to not be marked invalid")
require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex)
}
3 changes: 3 additions & 0 deletions client/interfaces/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type AllocStateHandler interface {
// AllocStateUpdated is used to emit an updated allocation. This allocation
// is stripped to only include client settable fields.
AllocStateUpdated(alloc *structs.Allocation)

// PutAllocation is used to persist an updated allocation in the local state store.
PutAllocation(*structs.Allocation) error
}

// DeviceStatsReporter gives access to the latest resource usage
Expand Down
4 changes: 4 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,10 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.StopAfterClientDisconnect = taskGroup.StopAfterClientDisconnect
}

if taskGroup.MaxClientDisconnect != nil {
tg.MaxClientDisconnect = taskGroup.MaxClientDisconnect
}

if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2558,6 +2558,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
MaxClientDisconnect: helper.TimeToPtr(30 * time.Second),
Tasks: []*api.Task{
{
Name: "task1",
Expand Down Expand Up @@ -2955,6 +2956,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
MaxClientDisconnect: helper.TimeToPtr(30 * time.Second),
Tasks: []*structs.Task{
{
Name: "task1",
Expand Down
2 changes: 2 additions & 0 deletions command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ func buildDisplayMessage(event *api.TaskEvent) string {
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskClientReconnected:
desc = "Client reconnected"
default:
desc = event.Message
}
Expand Down
Loading