Skip to content

Commit

Permalink
Merge pull request #835 from hashicorp/f-client-alloc-update
Browse files Browse the repository at this point in the history
Batch client allocation updates to the server
  • Loading branch information
dadgar committed Feb 22, 2016
2 parents 19c8b8b + 48eff00 commit daf10d8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 78 deletions.
44 changes: 10 additions & 34 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
)

const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second

// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
// send that the task was received to the server because another transistion
Expand All @@ -30,7 +26,7 @@ const (
)

// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation) error
type AllocStateUpdater func(alloc *structs.Allocation)

// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
Expand Down Expand Up @@ -262,9 +258,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
} else if pending {
alloc.ClientStatus = structs.AllocClientStatusPending
} else if dead {
alloc.ClientStatus = structs.AllocClientStatusDead
}

return alloc
}

Expand All @@ -273,42 +272,19 @@ func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-r.dirtyCh:
r.retrySyncState(r.destroyCh)
r.syncStatus()
case <-r.destroyCh:
return
}
}
}

// retrySyncState is used to retry the state sync until success
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
for {
if err := r.syncStatus(); err == nil {
// The Alloc State might have been re-computed so we are
// snapshoting only the alloc runner
r.saveAllocRunnerState()
return
}
select {
case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)):
case <-stopCh:
return
}
}
}

// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc.
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()

// Attempt to update the status
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
r.updater(alloc)
return r.saveAllocRunnerState()
}

// setStatus is used to update the allocation status
Expand Down Expand Up @@ -475,7 +451,7 @@ OUTER:
r.taskLock.Unlock()

// Final state sync
r.retrySyncState(nil)
r.syncStatus()

// Block until we should destroy the state of the alloc
r.handleDestroy()
Expand Down
4 changes: 1 addition & 3 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ import (
type MockAllocStateUpdater struct {
Count int
Allocs []*structs.Allocation
Err error
}

func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error {
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
m.Count += 1
m.Allocs = append(m.Allocs, alloc)
return m.Err
}

func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
Expand Down
96 changes: 79 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ const (
// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
nodeUpdateRetryIntv = 5 * time.Second

// allocSyncIntv is the batching period of allocation updates before they
// are synced with the server.
allocSyncIntv = 200 * time.Millisecond

// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second
)

// DefaultConfig returns the default configuration
Expand Down Expand Up @@ -100,6 +108,9 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex

// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -112,12 +123,13 @@ func NewClient(cfg *config.Config) (*Client, error) {

// Create the client
c := &Client{
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}

// Setup the Consul Service
Expand Down Expand Up @@ -166,6 +178,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Begin periodic snapshotting of state.
go c.periodicSnapshot()

// Begin syncing allocations to the server
go c.allocSync()

// Start the client!
go c.run()

Expand Down Expand Up @@ -816,19 +831,66 @@ func (c *Client) updateNodeStatus() error {
}

// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
args := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.GenericResponse
err := c.RPC("Node.UpdateAlloc", &args, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to update allocation: %v", err)
return err
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
// Only send the fields that are updatable by the client.
stripped := new(structs.Allocation)
stripped.ID = alloc.ID
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
}

return nil
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
staggered := false
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
for {
select {
case <-c.shutdownCh:
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
}

sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
}

// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}

var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
c.logger.Printf("[ERR] client: failed to update allocations: %v", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
staggered = true
} else {
updates = make(map[string]*structs.Allocation)
if staggered {
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
staggered = false
}
}
}
}
}

// allocUpdates holds the results of receiving updated allocations from the
Expand Down
55 changes: 31 additions & 24 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,27 +324,27 @@ func TestClient_UpdateAllocStatus(t *testing.T) {

alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus

state := s1.State()
state.UpsertAllocs(100, []*structs.Allocation{alloc})

newAlloc := new(structs.Allocation)
*newAlloc = *alloc
newAlloc.ClientStatus = structs.AllocClientStatusRunning

err := c1.updateAllocStatus(newAlloc)
if err != nil {
t.Fatalf("err: %v", err)
}

out, err := state.AllocByID(alloc.ID)
if err != nil {
testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
if err != nil {
return false, err
}
if out == nil {
return false, fmt.Errorf("no such alloc")
}
if out.ClientStatus == originalStatus {
return false, fmt.Errorf("Alloc client status not updated; got %v", out.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
}

if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", out)
}
})
}

func TestClient_WatchAllocs(t *testing.T) {
Expand Down Expand Up @@ -440,8 +440,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
task.Config["args"] = []string{"10"}

state := s1.State()
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1})
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -470,12 +469,20 @@ func TestClient_SaveRestoreState(t *testing.T) {
defer c2.Shutdown()

// Ensure the allocation is running
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", ar.Alloc())
}
testutil.WaitForResult(func() (bool, error) {
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
status := ar.Alloc().ClientStatus
alive := status != structs.AllocClientStatusRunning ||
status != structs.AllocClientStatusPending
if !alive {
return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestClient_Init(t *testing.T) {
Expand Down

0 comments on commit daf10d8

Please sign in to comment.