Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch client allocation updates to the server #835

Merged
merged 3 commits into from
Feb 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 10 additions & 34 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
)

const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second

// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
// send that the task was received to the server because another transistion
Expand All @@ -30,7 +26,7 @@ const (
)

// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation) error
type AllocStateUpdater func(alloc *structs.Allocation)

// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
Expand Down Expand Up @@ -262,9 +258,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
} else if pending {
alloc.ClientStatus = structs.AllocClientStatusPending
} else if dead {
alloc.ClientStatus = structs.AllocClientStatusDead
}

return alloc
}

Expand All @@ -273,42 +272,19 @@ func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-r.dirtyCh:
r.retrySyncState(r.destroyCh)
r.syncStatus()
case <-r.destroyCh:
return
}
}
}

// retrySyncState is used to retry the state sync until success
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
for {
if err := r.syncStatus(); err == nil {
// The Alloc State might have been re-computed so we are
// snapshoting only the alloc runner
r.saveAllocRunnerState()
return
}
select {
case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)):
case <-stopCh:
return
}
}
}

// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc.
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()

// Attempt to update the status
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
r.updater(alloc)
return r.saveAllocRunnerState()
}

// setStatus is used to update the allocation status
Expand Down Expand Up @@ -475,7 +451,7 @@ OUTER:
r.taskLock.Unlock()

// Final state sync
r.retrySyncState(nil)
r.syncStatus()

// Block until we should destroy the state of the alloc
r.handleDestroy()
Expand Down
4 changes: 1 addition & 3 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ import (
type MockAllocStateUpdater struct {
Count int
Allocs []*structs.Allocation
Err error
}

func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error {
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
m.Count += 1
m.Allocs = append(m.Allocs, alloc)
return m.Err
}

func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
Expand Down
96 changes: 79 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ const (
// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
nodeUpdateRetryIntv = 5 * time.Second

// allocSyncIntv is the batching period of allocation updates before they
// are synced with the server.
allocSyncIntv = 200 * time.Millisecond

// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second
)

// DefaultConfig returns the default configuration
Expand Down Expand Up @@ -100,6 +108,9 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex

// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -112,12 +123,13 @@ func NewClient(cfg *config.Config) (*Client, error) {

// Create the client
c := &Client{
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}

// Setup the Consul Service
Expand Down Expand Up @@ -166,6 +178,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Begin periodic snapshotting of state.
go c.periodicSnapshot()

// Begin syncing allocations to the server
go c.allocSync()

// Start the client!
go c.run()

Expand Down Expand Up @@ -816,19 +831,66 @@ func (c *Client) updateNodeStatus() error {
}

// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
args := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.GenericResponse
err := c.RPC("Node.UpdateAlloc", &args, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to update allocation: %v", err)
return err
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
// Only send the fields that are updatable by the client.
stripped := new(structs.Allocation)
stripped.ID = alloc.ID
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
}

return nil
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
staggered := false
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
for {
select {
case <-c.shutdownCh:
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
}

sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
}

// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}

var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
c.logger.Printf("[ERR] client: failed to update allocations: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On error, I'd bring back the random stagger logic to avoid thundering herd

syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
staggered = true
} else {
updates = make(map[string]*structs.Allocation)
if staggered {
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
staggered = false
}
}
}
}
}

// allocUpdates holds the results of receiving updated allocations from the
Expand Down
55 changes: 31 additions & 24 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,27 +324,27 @@ func TestClient_UpdateAllocStatus(t *testing.T) {

alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus

state := s1.State()
state.UpsertAllocs(100, []*structs.Allocation{alloc})

newAlloc := new(structs.Allocation)
*newAlloc = *alloc
newAlloc.ClientStatus = structs.AllocClientStatusRunning

err := c1.updateAllocStatus(newAlloc)
if err != nil {
t.Fatalf("err: %v", err)
}

out, err := state.AllocByID(alloc.ID)
if err != nil {
testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
if err != nil {
return false, err
}
if out == nil {
return false, fmt.Errorf("no such alloc")
}
if out.ClientStatus == originalStatus {
return false, fmt.Errorf("Alloc client status not updated; got %v", out.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
}

if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", out)
}
})
}

func TestClient_WatchAllocs(t *testing.T) {
Expand Down Expand Up @@ -440,8 +440,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
task.Config["args"] = []string{"10"}

state := s1.State()
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1})
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -470,12 +469,20 @@ func TestClient_SaveRestoreState(t *testing.T) {
defer c2.Shutdown()

// Ensure the allocation is running
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", ar.Alloc())
}
testutil.WaitForResult(func() (bool, error) {
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
status := ar.Alloc().ClientStatus
alive := status != structs.AllocClientStatusRunning ||
status != structs.AllocClientStatusPending
if !alive {
return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestClient_Init(t *testing.T) {
Expand Down