Skip to content

Commit

Permalink
bunch of log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mehulkar committed Sep 2, 2022
1 parent b4c2ac6 commit e6df3eb
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 51 deletions.
38 changes: 19 additions & 19 deletions cli/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ func (r *run) runOperation(ctx gocontext.Context, g *completeGraph, rs *runSpec,
if err != nil {
return errors.Wrap(err, "error preparing engine")
}
hashTracker := taskhash.NewTracker(g.RootNode, g.GlobalHash, g.Pipeline, g.PackageInfos)
err = hashTracker.CalculateFileHashes(engine.TaskGraph.Vertices(), rs.Opts.runOpts.concurrency, r.config.Cwd)
tracker := taskhash.NewTracker(g.RootNode, g.GlobalHash, g.Pipeline, g.PackageInfos)
err = tracker.CalculateFileHashes(engine.TaskGraph.Vertices(), rs.Opts.runOpts.concurrency, r.config.Cwd)
if err != nil {
return errors.Wrap(err, "error hashing package files")
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (r *run) runOperation(ctx gocontext.Context, g *completeGraph, rs *runSpec,
}
}
} else if rs.Opts.runOpts.dryRun {
tasksRun, err := r.executeDryRun(ctx, engine, g, hashTracker, rs)
tasksRun, err := r.executeDryRun(ctx, engine, g, tracker, rs)
if err != nil {
return err
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func (r *run) runOperation(ctx gocontext.Context, g *completeGraph, rs *runSpec,
sort.Strings(packagesInScope)
r.ui.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", ")))
r.ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len()))))
return r.executeTasks(ctx, g, rs, engine, packageManager, hashTracker, startAt)
return r.executeTasks(ctx, g, rs, engine, packageManager, tracker, startAt)
}
return nil
}
Expand Down Expand Up @@ -779,22 +779,22 @@ type hashedTask struct {

func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Scheduler, g *completeGraph, taskHashes *taskhash.Tracker, rs *runSpec) ([]hashedTask, error) {
taskIDs := []hashedTask{}
errs := engine.Execute(g.getPackageTaskVisitor(ctx, func(ctx gocontext.Context, pt *nodes.PackageTask) error {
passThroughArgs := rs.ArgsForTask(pt.Task)
deps := engine.TaskGraph.DownEdges(pt.TaskID)
hash, err := taskHashes.CalculateTaskHash(pt, deps, passThroughArgs)
errs := engine.Execute(g.getPackageTaskVisitor(ctx, func(ctx gocontext.Context, packageTask *nodes.PackageTask) error {
passThroughArgs := rs.ArgsForTask(packageTask.Task)
deps := engine.TaskGraph.DownEdges(packageTask.TaskID)
hash, err := taskHashes.CalculateTaskHash(packageTask, deps, passThroughArgs)
if err != nil {
return err
}
command, ok := pt.Command()
command, ok := packageTask.Command()
if !ok {
command = "<NONEXISTENT>"
}
isRootTask := pt.PackageName == util.RootPkgName
isRootTask := packageTask.PackageName == util.RootPkgName
if isRootTask && commandLooksLikeTurbo(command) {
return fmt.Errorf("root task %v (%v) looks like it invokes turbo and might cause a loop", pt.Task, command)
return fmt.Errorf("root task %v (%v) looks like it invokes turbo and might cause a loop", packageTask.Task, command)
}
ancestors, err := engine.TaskGraph.Ancestors(pt.TaskID)
ancestors, err := engine.TaskGraph.Ancestors(packageTask.TaskID)
if err != nil {
return err
}
Expand All @@ -805,7 +805,7 @@ func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Scheduler, g *co
stringAncestors = append(stringAncestors, dep.(string))
}
}
descendents, err := engine.TaskGraph.Descendents(pt.TaskID)
descendents, err := engine.TaskGraph.Descendents(packageTask.TaskID)
if err != nil {
return err
}
Expand All @@ -819,14 +819,14 @@ func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Scheduler, g *co
sort.Strings(stringDescendents)

taskIDs = append(taskIDs, hashedTask{
TaskID: pt.TaskID,
Task: pt.Task,
Package: pt.PackageName,
TaskID: packageTask.TaskID,
Task: packageTask.Task,
Package: packageTask.PackageName,
Hash: hash,
Command: command,
Dir: pt.Pkg.Dir.ToString(),
Outputs: pt.TaskDefinition.Outputs,
LogFile: pt.RepoRelativeLogFile(),
Dir: packageTask.Pkg.Dir.ToString(),
Outputs: packageTask.TaskDefinition.Outputs,
LogFile: packageTask.RepoRelativeLogFile(),
Dependencies: stringAncestors,
Dependents: stringDescendents,
})
Expand Down
83 changes: 51 additions & 32 deletions cli/internal/taskhash/taskhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ type packageFileSpec struct {
inputs []string
}

func specFromPackageTask(pt *nodes.PackageTask) packageFileSpec {
func specFromPackageTask(packageTask *nodes.PackageTask) packageFileSpec {
return packageFileSpec{
pkg: pt.PackageName,
inputs: pt.TaskDefinition.Inputs,
pkg: packageTask.PackageName,
inputs: packageTask.TaskDefinition.Inputs,
}
}

// packageFileHashKey is a hashable representation of a packageFileSpec.
type packageFileHashKey string

// hashes the inputs for a packageTask
func (pfs packageFileSpec) ToKey() packageFileHashKey {
sort.Strings(pfs.inputs)
return packageFileHashKey(fmt.Sprintf("%v#%v", pfs.pkg, strings.Join(pfs.inputs, "!")))
xx := fmt.Sprintf("%v#%v", pfs.pkg, strings.Join(pfs.inputs, "!"))
return packageFileHashKey(xx)
}

func safeCompileIgnoreFile(filepath string) (*gitignore.GitIgnore, error) {
Expand Down Expand Up @@ -156,8 +158,11 @@ type packageFileHashes map[packageFileHashKey]string
// CalculateFileHashes hashes each unique package-inputs combination that is present
// in the task graph. Must be called before calculating task hashes.
func (th *Tracker) CalculateFileHashes(allTasks []dag.Vertex, workerCount int, repoRoot turbopath.AbsolutePath) error {
fmt.Printf("CalculateFileHashes()\n")
hashTasks := make(util.Set)
for _, v := range allTasks {

for i, v := range allTasks {
fmt.Printf("\ttask %d: %v\n", i, v)
taskID, ok := v.(string)
if !ok {
return fmt.Errorf("unknown task %v", taskID)
Expand All @@ -167,44 +172,48 @@ func (th *Tracker) CalculateFileHashes(allTasks []dag.Vertex, workerCount int, r
}
pkgName, _ := util.GetPackageTaskFromId(taskID)
if pkgName == th.rootNode {
fmt.Printf("\tContinue\n")
continue
}

taskDefinition, ok := th.pipeline.GetTaskDefinition(taskID)
if !ok {
return fmt.Errorf("missing pipeline entry %v", taskID)
}

fmt.Printf("SOMETHING HAPPENING!!!!\n")

taskDefinition.Inputs = append(taskDefinition.Inputs, "./package.json")

for _, value := range taskDefinition.Inputs {
fmt.Printf("input: %s\n", value)
}

hashTasks.Add(&packageFileSpec{
pfs := &packageFileSpec{
pkg: pkgName,
inputs: taskDefinition.Inputs,
})
}

fmt.Printf("\ttaskDefinition.Inputs: %v\n", taskDefinition.Inputs)
fmt.Printf("\tpfs.inputs: %v\n", pfs.inputs)

hashTasks.Add(pfs)
}

hashes := make(map[packageFileHashKey]string)
hashQueue := make(chan *packageFileSpec, workerCount)
hashErrs := &errgroup.Group{}

for i := 0; i < workerCount; i++ {
hashErrs.Go(func() error {
for ht := range hashQueue {
fmt.Printf("ht %s\n", ht)
pkg, ok := th.packageInfos[ht.pkg]
for packageFileSpec := range hashQueue {
pkg, ok := th.packageInfos[packageFileSpec.pkg]
if !ok {
return fmt.Errorf("cannot find package %v", ht.pkg)
return fmt.Errorf("cannot find package %v", packageFileSpec.pkg)
}
hash, err := ht.hash(pkg, repoRoot)
hash, err := packageFileSpec.hash(pkg, repoRoot)
if err != nil {
return err
}
th.mu.Lock()
hashes[ht.ToKey()] = hash
pfsKey := packageFileSpec.ToKey()
fmt.Printf("\tadding to hashes\n")
fmt.Printf("\t\t%v: %v\n", pfsKey, hash)
hashes[pfsKey] = hash
th.mu.Unlock()
}
return nil
Expand All @@ -219,6 +228,8 @@ func (th *Tracker) CalculateFileHashes(allTasks []dag.Vertex, workerCount int, r
return err
}
th.packageInputsHashes = hashes

fmt.Printf("th.packageInputsHashes: %v\n", th.packageInputsHashes)
return nil
}

Expand Down Expand Up @@ -264,15 +275,23 @@ func (th *Tracker) calculateDependencyHashes(dependencySet dag.Set) ([]string, e
// CalculateTaskHash calculates the hash for package-task combination. It is threadsafe, provided
// that it has previously been called on its task-graph dependencies. File hashes must be calculated
// first.
func (th *Tracker) CalculateTaskHash(pt *nodes.PackageTask, dependencySet dag.Set, args []string) (string, error) {
x := specFromPackageTask(pt)
pkgFileHashKey := x.ToKey()
func (th *Tracker) CalculateTaskHash(packageTask *nodes.PackageTask, dependencySet dag.Set, args []string) (string, error) {
fmt.Printf("CalculateTaskHash(): taskID: %v\n", packageTask.TaskID)

fmt.Printf("pkg %s\n", x.pkg)
fmt.Printf("pkgFileHashKey: %s\n", pkgFileHashKey)
fmt.Printf("\ttaskDefinition.Inputs: %v\n", packageTask.TaskDefinition.Inputs)
pfs := specFromPackageTask(packageTask)
fmt.Printf("\tpfs.inputs: %v\n", pfs.inputs)
pkgFileHashKey := pfs.ToKey()

fmt.Printf("\tpkgFileHashKey: %s\n", pkgFileHashKey)

fmt.Printf("\tScripts\n")
for scriptName, script := range packageTask.Pkg.Scripts {
fmt.Printf("\t\t%s: %s\n", scriptName, script)
}

for _, x := range th.packageInputsHashes {
fmt.Printf("input hash: %s\n", x)
fmt.Printf("\tpackage input hashes: %s\n", x)
}

hashOfFiles, ok := th.packageInputsHashes[pkgFileHashKey]
Expand All @@ -281,32 +300,32 @@ func (th *Tracker) CalculateTaskHash(pt *nodes.PackageTask, dependencySet dag.Se
}

var envPrefixes []string
framework := inference.InferFramework(pt.Pkg)
framework := inference.InferFramework(packageTask.Pkg)
if framework != nil && framework.EnvPrefix != "" {
envPrefixes = append(envPrefixes, framework.EnvPrefix)
}

hashableEnvPairs := env.GetHashableEnvPairs(pt.TaskDefinition.EnvVarDependencies, envPrefixes)
outputs := pt.HashableOutputs()
hashableEnvPairs := env.GetHashableEnvPairs(packageTask.TaskDefinition.EnvVarDependencies, envPrefixes)
outputs := packageTask.HashableOutputs()
taskDependencyHashes, err := th.calculateDependencyHashes(dependencySet)
if err != nil {
return "", err
}
hash, err := fs.HashObject(&taskHashInputs{
hashOfFiles: hashOfFiles,
externalDepsHash: pt.Pkg.ExternalDepsHash,
task: pt.Task,
externalDepsHash: packageTask.Pkg.ExternalDepsHash,
task: packageTask.Task,
outputs: outputs,
passThruArgs: args,
hashableEnvPairs: hashableEnvPairs,
globalHash: th.globalHash,
taskDependencyHashes: taskDependencyHashes,
})
if err != nil {
return "", fmt.Errorf("failed to hash task %v: %v", pt.TaskID, hash)
return "", fmt.Errorf("failed to hash task %v: %v", packageTask.TaskID, hash)
}
th.mu.Lock()
th.packageTaskHashes[pt.TaskID] = hash
th.packageTaskHashes[packageTask.TaskID] = hash
th.mu.Unlock()
return hash, nil
}

0 comments on commit e6df3eb

Please sign in to comment.