Skip to content

Commit

Permalink
module loader: Avoid deadlock (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
radeksimko authored Feb 5, 2021
1 parent 6749bbf commit fdfe23d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 31 deletions.
28 changes: 14 additions & 14 deletions internal/terraform/module/module_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ func (ml *moduleLoader) run(ctx context.Context) {

if nextOp.Module.HasOpenFiles() && ml.prioCapacity() > 0 {
atomic.AddInt64(ml.prioLoadingCount, 1)
mod := ml.queue.PopOp()
go func(ml *moduleLoader) {
defer atomic.AddInt64(ml.prioLoadingCount, -1)
ml.executeModuleOp(ctx, mod)
ml.executeModuleOp(ctx, nextOp)
atomic.AddInt64(ml.prioLoadingCount, -1)
ml.tryDispatchingModuleOp()
}(ml)
} else if ml.nonPrioCapacity() > 0 {
atomic.AddInt64(ml.loadingCount, 1)
mod := ml.queue.PopOp()
go func(ml *moduleLoader) {
defer atomic.AddInt64(ml.loadingCount, -1)
ml.executeModuleOp(ctx, mod)
ml.executeModuleOp(ctx, nextOp)
atomic.AddInt64(ml.loadingCount, -1)
ml.tryDispatchingModuleOp()
}(ml)
} else {
// Account for an unlikely situation where next operation
Expand All @@ -105,21 +105,22 @@ func (ml *moduleLoader) run(ctx context.Context) {
// were decremented.
ml.logger.Println("no available capacity, retrying dispatch")
time.Sleep(100 * time.Millisecond)
ml.tryDispatchingModuleOp()
ml.queue.PushOp(nextOp)
go ml.tryDispatchingModuleOp()
}
}
}
}

func (ml *moduleLoader) tryDispatchingModuleOp() {
totalCapacity := ml.nonPrioCapacity() + ml.prioCapacity()
opsInQueue := ml.queue.Len()

// Keep scheduling work from queue if we have capacity
if opsInQueue > 0 && totalCapacity > 0 {
item := ml.queue.Peek()
nextModOp := item.(ModuleOperation)
ml.opsToDispatch <- nextModOp
if totalCapacity > 0 {
nextModOp, ok := ml.queue.PopOp()
if ok {
ml.opsToDispatch <- nextModOp
}
}
}

Expand All @@ -128,15 +129,14 @@ func (ml *moduleLoader) prioCapacity() int64 {
}

func (ml *moduleLoader) nonPrioCapacity() int64 {
return ml.prioParallelism - atomic.LoadInt64(ml.loadingCount)
return ml.nonPrioParallelism - atomic.LoadInt64(ml.loadingCount)
}

func (ml *moduleLoader) executeModuleOp(ctx context.Context, modOp ModuleOperation) {
ml.logger.Printf("executing %q for %s", modOp.Type, modOp.Module.Path())
// TODO: Report progress in % for each op based on queue length
defer ml.logger.Printf("finished %q for %s", modOp.Type, modOp.Module.Path())
defer modOp.markAsDone()
defer ml.tryDispatchingModuleOp()

switch modOp.Type {
case OpTypeGetTerraformVersion:
Expand Down
21 changes: 6 additions & 15 deletions internal/terraform/module/module_ops_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ func (q *moduleOpsQueue) PushOp(op ModuleOperation) {

}

func (q *moduleOpsQueue) PopOp() ModuleOperation {
func (q *moduleOpsQueue) PopOp() (ModuleOperation, bool) {
q.mu.Lock()
defer q.mu.Unlock()

if q.q.Len() == 0 {
return ModuleOperation{}, false
}

item := heap.Pop(&q.q)
modOp := item.(ModuleOperation)
return modOp
return modOp, true
}

func (q *moduleOpsQueue) Len() int {
Expand All @@ -43,14 +47,6 @@ func (q *moduleOpsQueue) Len() int {
return q.q.Len()
}

func (q *moduleOpsQueue) Peek() interface{} {
q.mu.Lock()
defer q.mu.Unlock()

item := q.q.Peek()
return item
}

type queue []ModuleOperation

var _ heap.Interface = &queue{}
Expand All @@ -72,11 +68,6 @@ func (q *queue) Pop() interface{} {
return item
}

func (q queue) Peek() interface{} {
n := len(q)
return q[n-1]
}

func (q queue) Len() int {
return len(q)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/terraform/module/module_ops_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestModuleOpsQueue_modulePriority(t *testing.T) {
mq.PushOp(op)
}

firstOp := mq.PopOp()
firstOp, _ := mq.PopOp()

expectedFirstPath := filepath.Join(dir, "beta")
firstPath := firstOp.Module.Path()
Expand All @@ -54,7 +54,7 @@ func TestModuleOpsQueue_modulePriority(t *testing.T) {
expectedFirstPath, firstPath)
}

secondOp := mq.PopOp()
secondOp, _ := mq.PopOp()
expectedSecondPath := filepath.Join(dir, "gamma")
secondPath := secondOp.Module.Path()
if secondPath != expectedSecondPath {
Expand Down

0 comments on commit fdfe23d

Please sign in to comment.