Skip to content

Commit

Permalink
Locking appropriately before closing the channel to indicate migration
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Jan 23, 2017
1 parent 0d2e8ea commit c254fbf
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,40 @@ type Client struct {
vaultClient vaultclient.VaultClient

// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]chan struct{}
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex

// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
}

// migrateAllocCtrl indicates whether migration is complete
type migrateAllocCtrl struct {
ch chan struct{}
closed bool
chLock sync.Mutex
}

func newMigrateAllocCtrl() *migrateAllocCtrl {
return &migrateAllocCtrl{
ch: make(chan struct{}),
}
}

func (m *migrateAllocCtrl) closeCh() {
m.chLock.Lock()
defer m.chLock.Unlock()

if m.closed {
return
}

// If channel is not closed then close it
m.closed = true
close(m.ch)
}

var (
// noServersErr is returned by the RPC method when the client has no
// configured servers. This is used to trigger Consul discovery if
Expand Down Expand Up @@ -188,7 +214,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
migratingAllocs: make(map[string]chan struct{}),
migratingAllocs: make(map[string]*migrateAllocCtrl),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
Expand Down Expand Up @@ -1420,7 +1446,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Stopping the migration if the allocation doesn't need any
// migration
if !update.updated.ShouldMigrate() {
close(ch)
ch.closeCh()
}
}
}
Expand Down Expand Up @@ -1455,7 +1481,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// prevents a race between a finishing blockForRemoteAlloc and
// another invocation of runAllocs
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
c.migratingAllocs[add.ID] = make(chan struct{})
c.migratingAllocs[add.ID] = newMigrateAllocCtrl()
go c.blockForRemoteAlloc(add)
}
}
Expand Down Expand Up @@ -1533,7 +1559,7 @@ ADDALLOC:

// waitForAllocTerminal waits for an allocation with the given alloc id to
// transition to terminal state and blocks the caller until then.
func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*structs.Allocation, error) {
func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) {
req := structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Expand All @@ -1551,7 +1577,7 @@ func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*st
select {
case <-time.After(retry):
continue
case <-stopCh:
case <-stopCh.ch:
return nil, fmt.Errorf("giving up waiting on alloc %v since migration is not needed", allocID)
case <-c.shutdownCh:
return nil, fmt.Errorf("aborting because client is shutting down")
Expand Down Expand Up @@ -1665,7 +1691,7 @@ func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAll
for {
// See if the alloc still needs migration
select {
case <-stopMigrating:
case <-stopMigrating.ch:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
return nil
Expand Down

0 comments on commit c254fbf

Please sign in to comment.