Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locking appropriately before closing the channel to indicate migration #2231

Merged
merged 1 commit into from
Jan 23, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,40 @@ type Client struct {
vaultClient vaultclient.VaultClient

// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]chan struct{}
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex

// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
}

// migrateAllocCtrl indicates whether migration is complete
type migrateAllocCtrl struct {
ch chan struct{}
closed bool
chLock sync.Mutex
}

func newMigrateAllocCtrl() *migrateAllocCtrl {
return &migrateAllocCtrl{
ch: make(chan struct{}),
}
}

func (m *migrateAllocCtrl) closeCh() {
m.chLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this lock as runAllocs method is only ever called from a single goroutine so all chan mutations are serialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be called concurrently though.

defer m.chLock.Unlock()

if m.closed {
return
}

// If channel is not closed then close it
m.closed = true
close(m.ch)
}

var (
// noServersErr is returned by the RPC method when the client has no
// configured servers. This is used to trigger Consul discovery if
Expand Down Expand Up @@ -188,7 +214,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
migratingAllocs: make(map[string]chan struct{}),
migratingAllocs: make(map[string]*migrateAllocCtrl),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
Expand Down Expand Up @@ -1420,7 +1446,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Stopping the migration if the allocation doesn't need any
// migration
if !update.updated.ShouldMigrate() {
close(ch)
ch.closeCh()
}
}
}
Expand Down Expand Up @@ -1455,7 +1481,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// prevents a race between a finishing blockForRemoteAlloc and
// another invocation of runAllocs
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
c.migratingAllocs[add.ID] = make(chan struct{})
c.migratingAllocs[add.ID] = newMigrateAllocCtrl()
go c.blockForRemoteAlloc(add)
}
}
Expand Down Expand Up @@ -1533,7 +1559,7 @@ ADDALLOC:

// waitForAllocTerminal waits for an allocation with the given alloc id to
// transition to terminal state and blocks the caller until then.
func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*structs.Allocation, error) {
func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) {
req := structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Expand All @@ -1551,7 +1577,7 @@ func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*st
select {
case <-time.After(retry):
continue
case <-stopCh:
case <-stopCh.ch:
return nil, fmt.Errorf("giving up waiting on alloc %v since migration is not needed", allocID)
case <-c.shutdownCh:
return nil, fmt.Errorf("aborting because client is shutting down")
Expand Down Expand Up @@ -1665,7 +1691,7 @@ func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAll
for {
// See if the alloc still needs migration
select {
case <-stopMigrating:
case <-stopMigrating.ch:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
return nil
Expand Down