Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove data races in the client #789

Merged
merged 8 commits into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation
allocLock sync.Mutex

// Explicit status of allocation. Set when there are failures
allocClientStatus string
alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
allocLock sync.Mutex

dirtyCh chan struct{}

ctx *driver.ExecContext
ctxLock sync.Mutex
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
Expand Down Expand Up @@ -76,7 +75,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: alloc.TaskStates,
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
Expand All @@ -87,7 +86,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat

// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
return path
}

// RestoreState is used to restore the state of the alloc runner
Expand All @@ -112,7 +114,7 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task, r.consulService)
r.tasks[name] = tr

Expand Down Expand Up @@ -153,16 +155,27 @@ func (r *AllocRunner) SaveState() error {
}

func (r *AllocRunner) saveAllocRunnerState() error {
// Create the snapshot.
r.taskStatusLock.RLock()
defer r.taskStatusLock.RUnlock()
states := copyTaskStates(r.taskStates)
r.taskStatusLock.RUnlock()

alloc := r.Alloc()
r.allocLock.Lock()
defer r.allocLock.Unlock()
allocClientStatus := r.allocClientStatus
allocClientDescription := r.allocClientDescription
r.allocLock.Unlock()

r.ctxLock.Lock()
ctx := r.ctx
r.ctxLock.Unlock()

snap := allocRunnerState{
Alloc: r.alloc,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
TaskStates: r.taskStates,
Alloc: alloc,
Context: ctx,
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: states,
}
return persistState(r.stateFilePath(), &snap)
}
Expand All @@ -186,16 +199,33 @@ func (r *AllocRunner) DestroyContext() error {
return r.ctx.AllocDir.Destroy()
}

// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
for task, state := range states {
copy[task] = state.Copy()
}
return copy
}

// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
alloc := r.alloc.Copy()

// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
r.allocLock.Unlock()
return alloc
}
r.allocLock.Unlock()

// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
alloc.TaskStates = r.taskStates
alloc.TaskStates = copyTaskStates(r.taskStates)
for _, state := range r.taskStates {
switch state.State {
case structs.TaskStateRunning:
Expand All @@ -213,13 +243,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
}
r.taskStatusLock.RUnlock()

// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
return alloc
}

// Determine the alloc status
if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed
Expand Down Expand Up @@ -276,8 +299,10 @@ func (r *AllocRunner) syncStatus() error {

// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
r.alloc.ClientStatus = status
r.alloc.ClientDescription = desc
r.allocLock.Lock()
r.allocClientStatus = status
r.allocClientDescription = desc
r.allocLock.Unlock()
select {
case r.dirtyCh <- struct{}{}:
default:
Expand Down Expand Up @@ -336,6 +361,7 @@ func (r *AllocRunner) Run() {
}

// Create the execution context
r.ctxLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you need to unlock this in the error path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
if err := allocDir.Build(tg.Tasks); err != nil {
Expand All @@ -345,6 +371,7 @@ func (r *AllocRunner) Run() {
}
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
}
r.ctxLock.Unlock()

// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
Expand All @@ -364,8 +391,8 @@ func (r *AllocRunner) Run() {
continue
}

tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task.Copy(), r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down
45 changes: 41 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (

// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
nodeUpdateRetryIntv = 30 * time.Second
nodeUpdateRetryIntv = 5 * time.Second
)

// DefaultConfig returns the default configuration
Expand All @@ -75,6 +75,10 @@ type Client struct {
config *config.Config
start time.Time

// configCopy is a copy that should be passed to alloc-runners.
configCopy *config.Config
configLock sync.RWMutex

logger *log.Logger

consulService *ConsulService
Expand All @@ -90,6 +94,7 @@ type Client struct {

lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex

// allocs is the current set of allocations
allocs map[string]*AllocRunner
Expand Down Expand Up @@ -143,6 +148,10 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Set up the known servers list
c.SetServers(c.config.Servers)

// Store the config copy before restoring state but after it has been
// initialized.
c.configCopy = c.config.Copy()

// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
Expand Down Expand Up @@ -408,7 +417,9 @@ func (c *Client) restoreState() error {
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
Expand Down Expand Up @@ -523,7 +534,10 @@ func (c *Client) fingerprint() error {
if err != nil {
return err
}

c.configLock.Lock()
applies, err := f.Fingerprint(c.config, c.config.Node)
c.configLock.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -551,9 +565,11 @@ func (c *Client) fingerprintPeriodic(name string, f fingerprint.Fingerprint, d t
for {
select {
case <-time.After(d):
c.configLock.Lock()
if _, err := f.Fingerprint(c.config, c.config.Node); err != nil {
c.logger.Printf("[DEBUG] client: periodic fingerprinting for %v failed: %v", name, err)
}
c.configLock.Unlock()
case <-c.shutdownCh:
return
}
Expand Down Expand Up @@ -581,7 +597,9 @@ func (c *Client) setupDrivers() error {
if err != nil {
return err
}
c.configLock.Lock()
applies, err := d.Fingerprint(c.config, c.config.Node)
c.configLock.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -647,7 +665,9 @@ func (c *Client) run() {
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}

case <-c.shutdownCh:
Expand All @@ -661,6 +681,8 @@ func (c *Client) run() {
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()
newAttrHash, err := hashstructure.Hash(c.config.Node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
Expand Down Expand Up @@ -711,6 +733,9 @@ func (c *Client) registerNode() error {
if len(resp.EvalIDs) != 0 {
c.logger.Printf("[DEBUG] client: %d evaluations triggered by node registration", len(resp.EvalIDs))
}

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
return nil
Expand All @@ -736,6 +761,9 @@ func (c *Client) updateNodeStatus() error {
if resp.Index != 0 {
c.logger.Printf("[DEBUG] client: state updated to %s", req.Status)
}

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
return nil
Expand Down Expand Up @@ -896,6 +924,13 @@ func (c *Client) watchNodeUpdates() {
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash)
if changed {
c.logger.Printf("[DEBUG] client: state changed, updating node.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()

c.retryRegisterNode()
}
case <-c.shutdownCh:
Expand All @@ -910,7 +945,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
for _, ar := range c.allocs {
exist = append(exist, ar.Alloc())
exist = append(exist, ar.alloc)
}
c.allocLock.RUnlock()

Expand Down Expand Up @@ -979,7 +1014,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[alloc.ID] = ar
go ar.Run()
return nil
Expand Down
9 changes: 9 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ type Config struct {
Options map[string]string
}

func (c *Config) Copy() *Config {
nc := new(Config)
*nc = *c
nc.Node = nc.Node.Copy()
nc.Servers = structs.CopySliceString(nc.Servers)
nc.Options = structs.CopyMapStringString(nc.Options)
return nc
}

// Read returns the specified configuration value or "".
func (c *Config) Read(id string) string {
val, ok := c.Options[id]
Expand Down
Loading