Skip to content

Commit

Permalink
add concurrent download support - resolves hashicorp#11244
Browse files Browse the repository at this point in the history
  • Loading branch information
gowthamgts committed Nov 18, 2021
1 parent be30a93 commit 9cbea5b
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions client/allocrunner/taskrunner/artifact_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package taskrunner
import (
"context"
"fmt"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/getter"
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"sync"
)

// artifactHook downloads artifacts for a task.
Expand All @@ -25,24 +25,18 @@ func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook {
return h
}

func (*artifactHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "artifacts"
}

func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if len(req.Task.Artifacts) == 0 {
resp.Done = true
return nil
func (h *artifactHook) createWorkers(req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse, noOfWorkers int, jobsChannel chan *structs.TaskArtifact, errorChannel chan error) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go h.doWork(req, resp, jobsChannel, errorChannel, &wg)
}
wg.Wait()
close(errorChannel)
}

// Initialize hook state to store download progress
resp.State = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

for _, artifact := range req.Task.Artifacts {
func (h *artifactHook) doWork(req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse, jobs chan *structs.TaskArtifact, errorChannel chan error, wg *sync.WaitGroup) {
for artifact := range jobs {
aid := artifact.Hash()
if req.PreviousState[aid] != "" {
h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource)
Expand All @@ -60,14 +54,58 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
)
herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))

return herr
errorChannel <- herr
continue
}

// Mark artifact as downloaded to avoid re-downloading due to
// retries caused by subsequent artifacts failing. Any
// non-empty value works.
resp.State[aid] = "1"
}
wg.Done()
}

func (*artifactHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "artifacts"
}

func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if len(req.Task.Artifacts) == 0 {
resp.Done = true
return nil
}

// Initialize hook state to store download progress
resp.State = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

// maxConcurrency denotes the number of workers that will download artifacts in parallel
maxConcurrency := 3

// jobsChannel is a buffered channel which will have all the artifacts that needs to be processed
jobsChannel := make(chan *structs.TaskArtifact, maxConcurrency)
// Push all artifact requests to job channel
go func() {
for _, artifact := range req.Task.Artifacts {
jobsChannel <- artifact
}
close(jobsChannel)
}()

errorChannel := make(chan error, maxConcurrency)
// create workers and process artifacts
h.createWorkers(req, resp, maxConcurrency, jobsChannel, errorChannel)

// Iterate over the errorChannel and if there is an error, return it
for err := range errorChannel {
if err != nil {
return err
}
}

resp.Done = true
return nil
Expand Down

0 comments on commit 9cbea5b

Please sign in to comment.