From 6b3c22e91a6fd09c6d0cdb60909b436a2033faba Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 21 Feb 2016 19:20:50 -0800 Subject: [PATCH 1/3] Batch client allocation updates to the server --- client/alloc_runner.go | 40 +++++------------ client/alloc_runner_test.go | 4 +- client/client.go | 85 +++++++++++++++++++++++++++++-------- client/client_test.go | 32 +++++++------- 4 files changed, 94 insertions(+), 67 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index a8bb0186d97e..5ec5873dfff4 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -30,7 +30,7 @@ const ( ) // AllocStateUpdater is used to update the status of an allocation -type AllocStateUpdater func(alloc *structs.Allocation) error +type AllocStateUpdater func(alloc *structs.Allocation) // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { @@ -262,9 +262,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation { alloc.ClientStatus = structs.AllocClientStatusFailed } else if running { alloc.ClientStatus = structs.AllocClientStatusRunning - } else if dead && !pending { + } else if pending { + alloc.ClientStatus = structs.AllocClientStatusPending + } else if dead { alloc.ClientStatus = structs.AllocClientStatusDead } + return alloc } @@ -273,42 +276,19 @@ func (r *AllocRunner) dirtySyncState() { for { select { case <-r.dirtyCh: - r.retrySyncState(r.destroyCh) + r.syncStatus() case <-r.destroyCh: return } } } -// retrySyncState is used to retry the state sync until success -func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { - for { - if err := r.syncStatus(); err == nil { - // The Alloc State might have been re-computed so we are - // snapshoting only the alloc runner - r.saveAllocRunnerState() - return - } - select { - case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)): - case <-stopCh: - return - } - } -} - // syncStatus is used to run and sync the status when it changes func (r *AllocRunner) syncStatus() error { - // Get a copy of our alloc. + // Get a copy of our alloc, update status server side and sync to disk alloc := r.Alloc() - - // Attempt to update the status - if err := r.updater(alloc); err != nil { - r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s", - alloc.ID, alloc.ClientStatus, err) - return err - } - return nil + r.updater(alloc) + return r.saveAllocRunnerState() } // setStatus is used to update the allocation status @@ -475,7 +455,7 @@ OUTER: r.taskLock.Unlock() // Final state sync - r.retrySyncState(nil) + r.syncStatus() // Block until we should destroy the state of the alloc r.handleDestroy() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 6b526b846dbe..5d31e97e9f61 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -16,13 +16,11 @@ import ( type MockAllocStateUpdater struct { Count int Allocs []*structs.Allocation - Err error } -func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error { +func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) { m.Count += 1 m.Allocs = append(m.Allocs, alloc) - return m.Err } func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { diff --git a/client/client.go b/client/client.go index 8fb1cf0d8799..c5879796369c 100644 --- a/client/client.go +++ b/client/client.go @@ -58,6 +58,10 @@ const ( // nodeUpdateRetryIntv is how often the client checks for updates to the // node attributes or meta map. nodeUpdateRetryIntv = 5 * time.Second + + // allocSyncIntv is the batching period of allocation updates before they + // are synced with the server. + allocSyncIntv = 200 * time.Millisecond ) // DefaultConfig returns the default configuration @@ -100,6 +104,9 @@ type Client struct { allocs map[string]*AllocRunner allocLock sync.RWMutex + // allocUpdates stores allocations that need to be synced to the server. + allocUpdates chan *structs.Allocation + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -112,12 +119,13 @@ func NewClient(cfg *config.Config) (*Client, error) { // Create the client c := &Client{ - config: cfg, - start: time.Now(), - connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), - logger: logger, - allocs: make(map[string]*AllocRunner), - shutdownCh: make(chan struct{}), + config: cfg, + start: time.Now(), + connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + logger: logger, + allocs: make(map[string]*AllocRunner), + allocUpdates: make(chan *structs.Allocation, 64), + shutdownCh: make(chan struct{}), } // Setup the Consul Service @@ -166,6 +174,9 @@ func NewClient(cfg *config.Config) (*Client, error) { // Begin periodic snapshotting of state. go c.periodicSnapshot() + // Begin syncing allocations to the server + go c.allocSync() + // Start the client! go c.run() @@ -816,19 +827,57 @@ func (c *Client) updateNodeStatus() error { } // updateAllocStatus is used to update the status of an allocation -func (c *Client) updateAllocStatus(alloc *structs.Allocation) error { - args := structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - WriteRequest: structs.WriteRequest{Region: c.config.Region}, - } - var resp structs.GenericResponse - err := c.RPC("Node.UpdateAlloc", &args, &resp) - if err != nil { - c.logger.Printf("[ERR] client: failed to update allocation: %v", err) - return err - } +func (c *Client) updateAllocStatus(alloc *structs.Allocation) { + // Only send the fields that are updatable by the client. + stripped := new(structs.Allocation) + stripped.ID = alloc.ID + stripped.TaskStates = alloc.TaskStates + stripped.ClientStatus = alloc.ClientStatus + stripped.ClientDescription = alloc.ClientDescription + c.allocUpdates <- stripped +} - return nil +// allocSync is a long lived function that batches allocation updates to the +// server. +func (c *Client) allocSync() { + timeoutTimer := time.NewTimer(allocSyncIntv) + timeoutCh := timeoutTimer.C + updates := make(map[string]*structs.Allocation) + for { + select { + case <-c.shutdownCh: + return + case alloc := <-c.allocUpdates: + // Batch the allocation updates until the timer triggers. + updates[alloc.ID] = alloc + case <-timeoutCh: + // Reset the timer + timeoutTimer.Reset(allocSyncIntv) + + // Fast path if there are no updates + if len(updates) == 0 { + continue + } + + sync := make([]*structs.Allocation, 0, len(updates)) + for _, alloc := range updates { + sync = append(sync, alloc) + } + + // Send to server. + args := structs.AllocUpdateRequest{ + Alloc: sync, + WriteRequest: structs.WriteRequest{Region: c.config.Region}, + } + + var resp structs.GenericResponse + if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil { + c.logger.Printf("[ERR] client: failed to update allocations: %v", err) + } else { + updates = make(map[string]*structs.Allocation) + } + } + } } // allocUpdates holds the results of receiving updated allocations from the diff --git a/client/client_test.go b/client/client_test.go index 6b6c2d27702f..66c59be87908 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -324,27 +324,27 @@ func TestClient_UpdateAllocStatus(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = c1.Node().ID + originalStatus := "foo" + alloc.ClientStatus = originalStatus state := s1.State() state.UpsertAllocs(100, []*structs.Allocation{alloc}) - newAlloc := new(structs.Allocation) - *newAlloc = *alloc - newAlloc.ClientStatus = structs.AllocClientStatusRunning - - err := c1.updateAllocStatus(newAlloc) - if err != nil { - t.Fatalf("err: %v", err) - } - - out, err := state.AllocByID(alloc.ID) - if err != nil { + testutil.WaitForResult(func() (bool, error) { + out, err := state.AllocByID(alloc.ID) + if err != nil { + return false, err + } + if out == nil { + return false, fmt.Errorf("no such alloc") + } + if out.ClientStatus == originalStatus { + return false, fmt.Errorf("Alloc client status not updated; got %v", out.ClientStatus) + } + return true, nil + }, func(err error) { t.Fatalf("err: %v", err) - } - - if out == nil || out.ClientStatus != structs.AllocClientStatusRunning { - t.Fatalf("bad: %#v", out) - } + }) } func TestClient_WatchAllocs(t *testing.T) { From a6eb1ec98a648f023edf8edcf440650e2c2ff6cb Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 21 Feb 2016 21:12:58 -0800 Subject: [PATCH 2/3] Fix test --- client/client_test.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 66c59be87908..462a600fbeb7 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -440,8 +440,7 @@ func TestClient_SaveRestoreState(t *testing.T) { task.Config["args"] = []string{"10"} state := s1.State() - err := state.UpsertAllocs(100, - []*structs.Allocation{alloc1}) + err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) } @@ -470,12 +469,20 @@ func TestClient_SaveRestoreState(t *testing.T) { defer c2.Shutdown() // Ensure the allocation is running - c2.allocLock.RLock() - ar := c2.allocs[alloc1.ID] - c2.allocLock.RUnlock() - if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning { - t.Fatalf("bad: %#v", ar.Alloc()) - } + testutil.WaitForResult(func() (bool, error) { + c2.allocLock.RLock() + ar := c2.allocs[alloc1.ID] + c2.allocLock.RUnlock() + status := ar.Alloc().ClientStatus + alive := status != structs.AllocClientStatusRunning || + status != structs.AllocClientStatusPending + if !alive { + return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc()) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) } func TestClient_Init(t *testing.T) { From 48eff00427c438329810c41e3f300f01317be9d8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 21 Feb 2016 21:32:32 -0800 Subject: [PATCH 3/3] address feedback --- client/alloc_runner.go | 4 ---- client/client.go | 27 ++++++++++++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 5ec5873dfff4..dd0f1f8db933 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -16,10 +16,6 @@ import ( ) const ( - // allocSyncRetryIntv is the interval on which we retry updating - // the status of the allocation - allocSyncRetryIntv = 15 * time.Second - // taskReceivedSyncLimit is how long the client will wait before sending // that a task was received to the server. The client does not immediately // send that the task was received to the server because another transistion diff --git a/client/client.go b/client/client.go index c5879796369c..1295ed062409 100644 --- a/client/client.go +++ b/client/client.go @@ -62,6 +62,10 @@ const ( // allocSyncIntv is the batching period of allocation updates before they // are synced with the server. allocSyncIntv = 200 * time.Millisecond + + // allocSyncRetryIntv is the interval on which we retry updating + // the status of the allocation + allocSyncRetryIntv = 5 * time.Second ) // DefaultConfig returns the default configuration @@ -834,26 +838,27 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { stripped.TaskStates = alloc.TaskStates stripped.ClientStatus = alloc.ClientStatus stripped.ClientDescription = alloc.ClientDescription - c.allocUpdates <- stripped + select { + case c.allocUpdates <- stripped: + case <-c.shutdownCh: + } } // allocSync is a long lived function that batches allocation updates to the // server. func (c *Client) allocSync() { - timeoutTimer := time.NewTimer(allocSyncIntv) - timeoutCh := timeoutTimer.C + staggered := false + syncTicker := time.NewTicker(allocSyncIntv) updates := make(map[string]*structs.Allocation) for { select { case <-c.shutdownCh: + syncTicker.Stop() return case alloc := <-c.allocUpdates: // Batch the allocation updates until the timer triggers. updates[alloc.ID] = alloc - case <-timeoutCh: - // Reset the timer - timeoutTimer.Reset(allocSyncIntv) - + case <-syncTicker.C: // Fast path if there are no updates if len(updates) == 0 { continue @@ -873,8 +878,16 @@ func (c *Client) allocSync() { var resp structs.GenericResponse if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil { c.logger.Printf("[ERR] client: failed to update allocations: %v", err) + syncTicker.Stop() + syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv)) + staggered = true } else { updates = make(map[string]*structs.Allocation) + if staggered { + syncTicker.Stop() + syncTicker = time.NewTicker(allocSyncIntv) + staggered = false + } } } }