Skip to content

Commit

Permalink
Merge pull request #5164 from hashicorp/b-client-addalloc-errhandle
Browse files Browse the repository at this point in the history
Handle client initialization errors when adding/restoring allocs
  • Loading branch information
Preetha authored Jan 12, 2019
2 parents 03bf0bb + 19777b8 commit f10c625
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 17 deletions.
9 changes: 9 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,15 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript
return "", ""
}

// SetClientStatus is a helper for forcing a specific client
// status on the alloc runner. This is used during restore errors
// when the task state can't be restored.
func (ar *allocRunner) SetClientStatus(clientStatus string) {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
ar.state.ClientStatus = clientStatus
}

// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
Expand Down
104 changes: 89 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ type Client struct {
allocs map[string]AllocRunner
allocLock sync.RWMutex

// invalidAllocs is a map that tracks allocations that failed because
// the client couldn't initialize alloc or task runners for it. This can
// happen due to driver errors
invalidAllocs map[string]struct{}

// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

Expand Down Expand Up @@ -265,6 +270,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
}
}

if cfg.StateDBFactory == nil {
cfg.StateDBFactory = state.GetStateDBFactory(cfg.DevMode)
}

// Create the logger
logger := cfg.Logger.ResetNamed("client")

Expand All @@ -286,6 +295,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
triggerNodeUpdate: make(chan struct{}, 8),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
fpInitialized: make(chan struct{}),
invalidAllocs: make(map[string]struct{}),
}

c.batchNodeUpdates = newBatchNodeUpdates(
Expand Down Expand Up @@ -472,7 +482,7 @@ func (c *Client) init() error {
c.logger.Info("using state directory", "state_dir", c.config.StateDir)

// Open the state database
db, err := state.GetStateDBFactory(c.config.DevMode)(c.logger, c.config.StateDir)
db, err := c.config.StateDBFactory(c.logger, c.config.StateDir)
if err != nil {
return fmt.Errorf("failed to open state database: %v", err)
}
Expand Down Expand Up @@ -896,6 +906,8 @@ func (c *Client) setServersImpl(in []string, force bool) (int, error) {
}

// restoreState is used to restore our state from the data dir
// If there are errors restoring a specific allocation it is marked
// as failed whenever possible.
func (c *Client) restoreState() error {
if c.config.DevMode {
return nil
Expand Down Expand Up @@ -924,7 +936,6 @@ func (c *Client) restoreState() error {
}

// Load each alloc back
var mErr multierror.Error
for _, alloc := range allocs {

//XXX On Restore we give up on watching previous allocs because
Expand Down Expand Up @@ -953,15 +964,17 @@ func (c *Client) restoreState() error {
ar, err := allocrunner.NewAllocRunner(arConf)
if err != nil {
c.logger.Error("error running alloc", "error", err, "alloc_id", alloc.ID)
mErr.Errors = append(mErr.Errors, err)
c.handleInvalidAllocs(alloc, err)
continue
}

// Restore state
if err := ar.Restore(); err != nil {
c.logger.Error("error restoring alloc", "error", err, "alloc_id", alloc.ID)
mErr.Errors = append(mErr.Errors, err)
//TODO Cleanup allocrunner
// Override the status of the alloc to failed
ar.SetClientStatus(structs.AllocClientStatusFailed)
// Destroy the alloc runner since this is a failed restore
ar.Destroy()
continue
}

Expand All @@ -971,23 +984,25 @@ func (c *Client) restoreState() error {
c.allocLock.Unlock()
}

// Don't run any allocs if there were any failures
//XXX removing this check would switch from all-or-nothing restores to
// best-effort. went with all-or-nothing for now
if err := mErr.ErrorOrNil(); err != nil {
return err
}

// All allocs restored successfully, run them!
c.allocLock.Lock()
for _, ar := range c.allocs {
go ar.Run()
}
c.allocLock.Unlock()

return nil
}

func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {
c.invalidAllocs[alloc.ID] = struct{}{}
// Mark alloc as failed so server can handle this
failed := makeFailedAlloc(alloc, err)
select {
case c.allocUpdates <- failed:
case <-c.shutdownCh:
}
}

// saveState is used to snapshot our state into the data dir.
func (c *Client) saveState() error {
var wg sync.WaitGroup
Expand Down Expand Up @@ -1774,7 +1789,10 @@ OUTER:
currentAR, ok := c.allocs[allocID]
c.allocLock.RUnlock()

if !ok || modifyIndex > currentAR.Alloc().AllocModifyIndex {
// Ignore alloc updates for allocs that are invalid because of initialization errors
_, isInvalid := c.invalidAllocs[allocID]

if (!ok || modifyIndex > currentAR.Alloc().AllocModifyIndex) && !isInvalid {
// Only pull allocs that are required. Filtered
// allocs might be at a higher index, so ignore
// it.
Expand Down Expand Up @@ -1938,6 +1956,11 @@ func (c *Client) runAllocs(update *allocUpdates) {
migrateToken := update.migrateTokens[add.ID]
if err := c.addAlloc(add, migrateToken); err != nil {
c.logger.Error("error adding alloc", "error", err, "alloc_id", add.ID)
// We mark the alloc as failed and send an update to the server
// We track the fact that creating an allocrunner failed so that we don't send updates again
if add.ClientStatus != structs.AllocClientStatusFailed {
c.handleInvalidAllocs(add, err)
}
}
}

Expand All @@ -1946,6 +1969,51 @@ func (c *Client) runAllocs(update *allocUpdates) {
c.garbageCollector.Trigger()
}

// makeFailedAlloc creates a stripped down version of the allocation passed in
// with its status set to failed and other fields needed for the server to be
// able to examine deployment and task states
func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation {
stripped := new(structs.Allocation)
stripped.ID = add.ID
stripped.NodeID = add.NodeID
stripped.ClientStatus = structs.AllocClientStatusFailed
stripped.ClientDescription = fmt.Sprintf("Unable to add allocation due to error: %v", err)

// Copy task states if it exists in the original allocation
if add.TaskStates != nil {
stripped.TaskStates = add.TaskStates
} else {
stripped.TaskStates = make(map[string]*structs.TaskState)
}

failTime := time.Now()
if add.DeploymentStatus.HasHealth() {
// Never change deployment health once it has been set
stripped.DeploymentStatus = add.DeploymentStatus.Copy()
} else {
stripped.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
Timestamp: failTime,
}
}

taskGroup := add.Job.LookupTaskGroup(add.TaskGroup)
if taskGroup == nil {
return stripped
}
for _, task := range taskGroup.Tasks {
ts, ok := stripped.TaskStates[task.Name]
if !ok {
ts = &structs.TaskState{}
stripped.TaskStates[task.Name] = ts
}
if ts.FinishedAt.IsZero() {
ts.FinishedAt = failTime
}
}
return stripped
}

// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(allocID string) {
Expand All @@ -1954,7 +2022,13 @@ func (c *Client) removeAlloc(allocID string) {

ar, ok := c.allocs[allocID]
if !ok {
c.logger.Warn("cannot remove alloc", "alloc_id", allocID, "error", "alloc not found")
if _, ok := c.invalidAllocs[allocID]; ok {
// Removing from invalid allocs map if present
delete(c.invalidAllocs, allocID)
} else {
// Alloc is unknown, log a warning.
c.logger.Warn("cannot remove nonexistent alloc", "alloc_id", allocID, "error", "alloc not found")
}
return
}

Expand Down
Loading

0 comments on commit f10c625

Please sign in to comment.