Skip to content

Commit

Permalink
Merge pull request #789 from hashicorp/f-client-races
Browse files Browse the repository at this point in the history
Remove data races in the client
  • Loading branch information
dadgar committed Feb 11, 2016
2 parents 6ef927e + 82ff13e commit 27984d8
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 51 deletions.
82 changes: 55 additions & 27 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation
allocLock sync.Mutex

// Explicit status of allocation. Set when there are failures
allocClientStatus string
alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
allocLock sync.Mutex

dirtyCh chan struct{}

ctx *driver.ExecContext
ctxLock sync.Mutex
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
Expand Down Expand Up @@ -76,7 +75,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: alloc.TaskStates,
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
Expand All @@ -87,7 +86,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat

// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
return path
}

// RestoreState is used to restore the state of the alloc runner
Expand All @@ -112,7 +114,7 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task, r.consulService)
r.tasks[name] = tr

Expand Down Expand Up @@ -153,16 +155,27 @@ func (r *AllocRunner) SaveState() error {
}

func (r *AllocRunner) saveAllocRunnerState() error {
// Create the snapshot.
r.taskStatusLock.RLock()
defer r.taskStatusLock.RUnlock()
states := copyTaskStates(r.taskStates)
r.taskStatusLock.RUnlock()

alloc := r.Alloc()
r.allocLock.Lock()
defer r.allocLock.Unlock()
allocClientStatus := r.allocClientStatus
allocClientDescription := r.allocClientDescription
r.allocLock.Unlock()

r.ctxLock.Lock()
ctx := r.ctx
r.ctxLock.Unlock()

snap := allocRunnerState{
Alloc: r.alloc,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
TaskStates: r.taskStates,
Alloc: alloc,
Context: ctx,
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: states,
}
return persistState(r.stateFilePath(), &snap)
}
Expand All @@ -186,16 +199,33 @@ func (r *AllocRunner) DestroyContext() error {
return r.ctx.AllocDir.Destroy()
}

// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
for task, state := range states {
copy[task] = state.Copy()
}
return copy
}

// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
alloc := r.alloc.Copy()

// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
r.allocLock.Unlock()
return alloc
}
r.allocLock.Unlock()

// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
alloc.TaskStates = r.taskStates
alloc.TaskStates = copyTaskStates(r.taskStates)
for _, state := range r.taskStates {
switch state.State {
case structs.TaskStateRunning:
Expand All @@ -213,13 +243,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
}
r.taskStatusLock.RUnlock()

// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
return alloc
}

// Determine the alloc status
if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed
Expand Down Expand Up @@ -276,8 +299,10 @@ func (r *AllocRunner) syncStatus() error {

// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
r.alloc.ClientStatus = status
r.alloc.ClientDescription = desc
r.allocLock.Lock()
r.allocClientStatus = status
r.allocClientDescription = desc
r.allocLock.Unlock()
select {
case r.dirtyCh <- struct{}{}:
default:
Expand Down Expand Up @@ -336,15 +361,18 @@ func (r *AllocRunner) Run() {
}

// Create the execution context
r.ctxLock.Lock()
if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
if err := allocDir.Build(tg.Tasks); err != nil {
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
r.ctxLock.Unlock()
return
}
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
}
r.ctxLock.Unlock()

// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
Expand All @@ -364,8 +392,8 @@ func (r *AllocRunner) Run() {
continue
}

tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task.Copy(), r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down
45 changes: 41 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (

// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
nodeUpdateRetryIntv = 30 * time.Second
nodeUpdateRetryIntv = 5 * time.Second
)

// DefaultConfig returns the default configuration
Expand All @@ -75,6 +75,10 @@ type Client struct {
config *config.Config
start time.Time

// configCopy is a copy that should be passed to alloc-runners.
configCopy *config.Config
configLock sync.RWMutex

logger *log.Logger

consulService *ConsulService
Expand All @@ -90,6 +94,7 @@ type Client struct {

lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex

// allocs is the current set of allocations
allocs map[string]*AllocRunner
Expand Down Expand Up @@ -143,6 +148,10 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Set up the known servers list
c.SetServers(c.config.Servers)

// Store the config copy before restoring state but after it has been
// initialized.
c.configCopy = c.config.Copy()

// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
Expand Down Expand Up @@ -408,7 +417,9 @@ func (c *Client) restoreState() error {
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
Expand Down Expand Up @@ -523,7 +534,10 @@ func (c *Client) fingerprint() error {
if err != nil {
return err
}

c.configLock.Lock()
applies, err := f.Fingerprint(c.config, c.config.Node)
c.configLock.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -551,9 +565,11 @@ func (c *Client) fingerprintPeriodic(name string, f fingerprint.Fingerprint, d t
for {
select {
case <-time.After(d):
c.configLock.Lock()
if _, err := f.Fingerprint(c.config, c.config.Node); err != nil {
c.logger.Printf("[DEBUG] client: periodic fingerprinting for %v failed: %v", name, err)
}
c.configLock.Unlock()
case <-c.shutdownCh:
return
}
Expand Down Expand Up @@ -581,7 +597,9 @@ func (c *Client) setupDrivers() error {
if err != nil {
return err
}
c.configLock.Lock()
applies, err := d.Fingerprint(c.config, c.config.Node)
c.configLock.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -647,7 +665,9 @@ func (c *Client) run() {
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}

case <-c.shutdownCh:
Expand All @@ -661,6 +681,8 @@ func (c *Client) run() {
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()
newAttrHash, err := hashstructure.Hash(c.config.Node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
Expand Down Expand Up @@ -711,6 +733,9 @@ func (c *Client) registerNode() error {
if len(resp.EvalIDs) != 0 {
c.logger.Printf("[DEBUG] client: %d evaluations triggered by node registration", len(resp.EvalIDs))
}

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
return nil
Expand All @@ -736,6 +761,9 @@ func (c *Client) updateNodeStatus() error {
if resp.Index != 0 {
c.logger.Printf("[DEBUG] client: state updated to %s", req.Status)
}

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
return nil
Expand Down Expand Up @@ -896,6 +924,13 @@ func (c *Client) watchNodeUpdates() {
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash)
if changed {
c.logger.Printf("[DEBUG] client: state changed, updating node.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()

c.retryRegisterNode()
}
case <-c.shutdownCh:
Expand All @@ -910,7 +945,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
for _, ar := range c.allocs {
exist = append(exist, ar.Alloc())
exist = append(exist, ar.alloc)
}
c.allocLock.RUnlock()

Expand Down Expand Up @@ -979,7 +1014,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[alloc.ID] = ar
go ar.Run()
return nil
Expand Down
9 changes: 9 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ type Config struct {
Options map[string]string
}

func (c *Config) Copy() *Config {
nc := new(Config)
*nc = *c
nc.Node = nc.Node.Copy()
nc.Servers = structs.CopySliceString(nc.Servers)
nc.Options = structs.CopyMapStringString(nc.Options)
return nc
}

// Read returns the specified configuration value or "".
func (c *Config) Read(id string) string {
val, ok := c.Options[id]
Expand Down
Loading

0 comments on commit 27984d8

Please sign in to comment.