Skip to content

Commit

Permalink
prioritized client updates
Browse files Browse the repository at this point in the history
The allocrunner sends several updates to the server during the early lifecycle
of an allocation and its tasks. Clients batch-up allocation updates every 200ms,
but experiments like the C2M challenge has shown that even with this batching,
servers can be overwhelmed with client updates during high volume
deployments. Benchmarking done in #9451 has shown that client updates can easily
represent ~70% of all Nomad Raft traffic.

Each allocation sends many updates during its lifetime, but only those that
change the `ClientStatus` field are critical for progressing a deployment or
kicking off a reschedule to recover from failures.

Add a priority to the client allocation sync and update the `syncTicker`
receiver so that we only send an update if there's a high priority update
waiting, or on every 5th tick. This means when there are no high priority
updates, the client will send updates at most every 1s instead of
200ms. Benchmarks have shown this can reduce overall Raft traffic by 10%, as
well as reduce client-to-server RPC traffic.

Fixes: #9451
  • Loading branch information
tgross committed May 30, 2023
1 parent 60e0404 commit 97878f4
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 53 deletions.
3 changes: 3 additions & 0 deletions .changelog/17354.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: prioritize allocation updates to reduce Raft and RPC load
```
28 changes: 24 additions & 4 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,12 @@ func (ar *allocRunner) handleTaskStateUpdates() {

ar.taskCoordinator.TaskStateUpdated(states)

// Get the client allocation
// Get the client allocation and priority
calloc := ar.clientAlloc(states)
priority := ar.getUpdatePriority(calloc)

// Update the server
ar.stateUpdater.AllocStateUpdated(calloc)
ar.stateUpdater.AllocStateUpdated(calloc, priority)

// Broadcast client alloc to listeners
ar.allocBroadcaster.Send(calloc)
Expand Down Expand Up @@ -990,7 +991,8 @@ func (ar *allocRunner) destroyImpl() {
// shutdown before Destroy finishes.
states := ar.killTasks()
calloc := ar.clientAlloc(states)
ar.stateUpdater.AllocStateUpdated(calloc)

ar.stateUpdater.AllocStateUpdated(calloc, cstructs.AllocUpdatePriorityTypical)

// Wait for tasks to exit and postrun hooks to finish
<-ar.waitCh
Expand Down Expand Up @@ -1375,7 +1377,7 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
}

// Update the server.
ar.stateUpdater.AllocStateUpdated(alloc)
ar.stateUpdater.AllocStateUpdated(alloc, cstructs.AllocUpdatePriorityUrgent)

// Broadcast client alloc to listeners.
err = ar.allocBroadcaster.Send(alloc)
Expand Down Expand Up @@ -1420,6 +1422,24 @@ func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) {
}
}

// getUpdatePriority returns a priority based on the difference between the
// allocation and the last update acknowledged by the server.
func (ar *allocRunner) getUpdatePriority(a *structs.Allocation) cstructs.AllocUpdatePriority {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()

last := ar.lastAcknowledgedState
if last == nil {
return cstructs.AllocUpdatePriorityTypical
}

if last.ClientStatus != a.ClientStatus {
return cstructs.AllocUpdatePriorityUrgent
}

return cstructs.AllocUpdatePriorityTypical
}

// LastAcknowledgedStateIsCurrent returns true if the current state matches the
// state that was last acknowledged from a server update. This is called from
// the client in the same goroutine that called AcknowledgeState so that we
Expand Down
5 changes: 3 additions & 2 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents
states[name] = tr.TaskState()
}

// Build the client allocation
// Build the client allocation and get the priority
calloc := a.ar.clientAlloc(states)
priority := a.ar.getUpdatePriority(calloc)

// Update the server
a.ar.stateUpdater.AllocStateUpdated(calloc)
a.ar.stateUpdater.AllocStateUpdated(calloc, priority)

// Broadcast client alloc to listeners
a.ar.allocBroadcaster.Send(calloc)
Expand Down
54 changes: 54 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -2514,3 +2515,56 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc))
}

func TestAllocRunner_GetUpdatePriority(t *testing.T) {
ci.Parallel(t)

alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{"run_for": "2ms"}
alloc.DesiredStatus = "stop"

conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
t.Cleanup(cleanup)

arIface, err := NewAllocRunner(conf)
must.NoError(t, err)
ar := arIface.(*allocRunner)

ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})

calloc := ar.clientAlloc(map[string]*structs.TaskState{})
ar.AcknowledgeState(&arstate.State{
ClientStatus: calloc.ClientStatus,
ClientDescription: calloc.ClientDescription,
DeploymentStatus: calloc.DeploymentStatus,
TaskStates: calloc.TaskStates,
NetworkStatus: calloc.NetworkStatus,
})

must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.getUpdatePriority(calloc))

// clientAlloc mutates the state, so verify this doesn't break the check
// without state having been updated
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.getUpdatePriority(calloc))

// make a no-op state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.getUpdatePriority(calloc))

// make a state update that should be detected as high priority
ar.SetClientStatus(structs.AllocClientStatusFailed)
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.Eq(t, cstructs.AllocUpdatePriorityUrgent, ar.getUpdatePriority(calloc))
}
3 changes: 2 additions & 1 deletion client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
Expand All @@ -39,7 +40,7 @@ type MockStateUpdater struct {

// AllocStateUpdated implements the AllocStateHandler interface and records an
// alloc update.
func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation, priority cstructs.AllocUpdatePriority) {
m.mu.Lock()
m.Updates = append(m.Updates, alloc)
m.mu.Unlock()
Expand Down
Loading

0 comments on commit 97878f4

Please sign in to comment.