From 7bb68199782551ff07fb740c4b49755ebdd22e76 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Mon, 1 Aug 2022 15:07:58 +0100 Subject: [PATCH 1/5] state: Introduce DependsOn for N-to-N job dependencies --- internal/job/job.go | 16 ++++-- internal/scheduler/scheduler_test.go | 79 ++++++++++++++++++++++++++++ internal/state/job_id_slice_index.go | 35 ++++++++++++ internal/state/jobs.go | 69 +++++++++++++++++++++++- internal/state/slice_length_index.go | 57 ++++++++++++++++++++ internal/state/state.go | 12 ++++- 6 files changed, 259 insertions(+), 9 deletions(-) create mode 100644 internal/state/job_id_slice_index.go create mode 100644 internal/state/slice_length_index.go diff --git a/internal/job/job.go b/internal/job/job.go index 528470215..6ad1cc8e2 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -27,6 +27,11 @@ type Job struct { // and before the job is marked as done (StateDone). // This can be used to schedule jobs dependent on the main job. Defer DeferFunc + + // DependsOn represents any other job IDs this job depends on. + // This will be taken into account when scheduling, so that only + // jobs with no dependencies are dispatched at any time. + DependsOn IDs } // DeferFunc represents a deferred function scheduling more jobs @@ -36,11 +41,12 @@ type DeferFunc func(ctx context.Context, jobErr error) (IDs, error) func (job Job) Copy() Job { return Job{ - Func: job.Func, - Dir: job.Dir, - Type: job.Type, - Priority: job.Priority, - Defer: job.Defer, + Func: job.Func, + Dir: job.Dir, + Type: job.Type, + Priority: job.Priority, + Defer: job.Defer, + DependsOn: job.DependsOn.Copy(), } } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 5de053b63..7fe82141c 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -10,7 +10,9 @@ import ( "sync" "sync/atomic" "testing" + "time" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/job" "github.com/hashicorp/terraform-ls/internal/state" @@ -316,6 +318,83 @@ func TestScheduler_defer(t *testing.T) { } } +func TestScheduler_dependsOn(t *testing.T) { + ss, err := state.NewStateStore() + if err != nil { + t.Fatal(err) + } + ss.SetLogger(testLogger()) + + tmpDir := t.TempDir() + + ctx := context.Background() + + s := NewScheduler(ss.JobStore, 2, job.LowPriority) + s.SetLogger(testLogger()) + s.Start(ctx) + t.Cleanup(func() { + s.Stop() + }) + + ids := make(job.IDs, 0) + executedJobs := make([]string, 0) + + dirPath := filepath.Join(tmpDir, "test-folder") + + id0, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(c context.Context) error { + time.Sleep(20 * time.Millisecond) + executedJobs = append(executedJobs, "test-0") + return nil + }, + Dir: document.DirHandleFromPath(dirPath), + Type: "test-0", + }) + if err != nil { + t.Fatal(err) + } + ids = append(ids, id0) + + id1, err := ss.JobStore.EnqueueJob(job.Job{ + Dir: document.DirHandleFromPath(dirPath), + Type: "test-1", + Func: func(c context.Context) error { + time.Sleep(20 * time.Millisecond) + executedJobs = append(executedJobs, "test-1") + return nil + }, + DependsOn: job.IDs{id0}, + }) + if err != nil { + t.Fatal(err) + } + ids = append(ids, id1) + + id2, err := ss.JobStore.EnqueueJob(job.Job{ + Dir: document.DirHandleFromPath(dirPath), + Type: "test-2", + Func: func(c context.Context) error { + executedJobs = append(executedJobs, "test-2") + return nil + }, + DependsOn: job.IDs{id0, id1}, + }) + if err != nil { + t.Fatal(err) + } + ids = append(ids, id2) + + err = ss.JobStore.WaitForJobs(ctx, ids...) + if err != nil { + t.Fatal(err) + } + + expectedJobs := []string{"test-0", "test-1", "test-2"} + if diff := cmp.Diff(expectedJobs, executedJobs); diff != "" { + t.Fatalf("unexpected jobs: %s", diff) + } +} + func testLogger() *log.Logger { if testing.Verbose() { return log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) diff --git a/internal/state/job_id_slice_index.go b/internal/state/job_id_slice_index.go new file mode 100644 index 000000000..cfe19fe81 --- /dev/null +++ b/internal/state/job_id_slice_index.go @@ -0,0 +1,35 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/terraform-ls/internal/job" +) + +type JobIdSliceIndex struct { + Field string +} + +func (s *JobIdSliceIndex) FromObject(obj interface{}) (bool, [][]byte, error) { + idx := &memdb.StringSliceFieldIndex{Field: s.Field} + return idx.FromObject(obj) +} + +func (s *JobIdSliceIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + arg, ok := args[0].(job.ID) + if !ok { + return nil, fmt.Errorf("argument must be a job.ID: %#v", args[0]) + } + // Add the null character as a terminator + arg += "\x00" + return []byte(arg), nil +} + +func (s *JobIdSliceIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { + idx := &memdb.StringSliceFieldIndex{Field: s.Field} + return idx.PrefixFromArgs(args...) +} diff --git a/internal/state/jobs.go b/internal/state/jobs.go index 05108094e..3de15e40c 100644 --- a/internal/state/jobs.go +++ b/internal/state/jobs.go @@ -79,6 +79,18 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) { newJobID := job.ID(fmt.Sprintf("%d", atomic.AddUint64(&js.lastJobId, 1))) + dependsOn := make(job.IDs, 0) + for _, jobId := range newJob.DependsOn { + isDone, err := js.isJobDone(txn, jobId) + if err != nil { + return "", err + } + if !isDone { + dependsOn = append(dependsOn, jobId) + } + } + newJob.DependsOn = dependsOn + sJob := &ScheduledJob{ ID: newJobID, Job: newJob, @@ -88,7 +100,7 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) { err = txn.Insert(js.tableName, sJob) if err != nil { - return "", err + return "", fmt.Errorf("failed to insert new job: %w", err) } js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q (IsDirOpen: %t)", @@ -99,6 +111,19 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) { return newJobID, nil } +func (js *JobStore) isJobDone(txn *memdb.Txn, id job.ID) (bool, error) { + obj, err := txn.First(js.tableName, "id", id) + if err != nil { + return false, err + } + if obj == nil { + return true, nil + } + + sj := obj.(*ScheduledJob) + return sj.State == StateDone, nil +} + func (js *JobStore) DequeueJobsForDir(dir document.DirHandle) error { txn := js.db.Txn(true) defer txn.Abort() @@ -118,6 +143,11 @@ func (js *JobStore) DequeueJobsForDir(dir document.DirHandle) error { return err } + err = js.removeJobFromDependsOn(txn, sJob.ID) + if err != nil { + return err + } + err = js.cleanupParentDoneJobsOf(txn, sJob.ID) if err != nil { return err @@ -216,7 +246,7 @@ func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) { txn := js.db.Txn(false) - wCh, obj, err := txn.FirstWatch(js.tableName, "priority_state", priority, StateQueued) + wCh, obj, err := txn.FirstWatch(js.tableName, "priority_dependecies_state", priority, 0, StateQueued) if err != nil { return "", job.Job{}, err } @@ -366,6 +396,11 @@ func (js *JobStore) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) js.logger.Printf("JOBS: Finishing job %q: %q for %q (err = %s, deferredJobs: %q)", sj.ID, sj.Type, sj.Dir, jobErr, deferredJobIds) + err = js.removeJobFromDependsOn(txn, id) + if err != nil { + return err + } + _, err = txn.DeleteAll(js.tableName, "id", id) if err != nil { return err @@ -396,6 +431,36 @@ func (js *JobStore) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) return nil } +func (js *JobStore) removeJobFromDependsOn(txn *memdb.Txn, id job.ID) error { + it, err := txn.Get(js.tableName, "depends_on", id) + if err != nil { + return err + } + for obj := it.Next(); obj != nil; obj = it.Next() { + sJob := obj.(*ScheduledJob) + idx, ok := idIsInSlice(sJob.DependsOn, id) + if ok { + jobCopy := sJob.Copy() + + // remove found ID from DependsOn + jobCopy.DependsOn[idx] = jobCopy.DependsOn[len(jobCopy.DependsOn)-1] + jobCopy.DependsOn = jobCopy.DependsOn[:len(jobCopy.DependsOn)-1] + + // re-insert updated data + _, err := txn.DeleteAll(js.tableName, "id", id) + if err != nil { + return err + } + err = txn.Insert(js.tableName, jobCopy) + if err != nil { + return err + } + } + } + + return nil +} + func (js *JobStore) cleanupParentDoneJobsOf(txn *memdb.Txn, id job.ID) error { it, err := txn.Get(js.tableName, "state", StateDone) if err != nil { diff --git a/internal/state/slice_length_index.go b/internal/state/slice_length_index.go new file mode 100644 index 000000000..749c382c9 --- /dev/null +++ b/internal/state/slice_length_index.go @@ -0,0 +1,57 @@ +package state + +import ( + "fmt" + "reflect" + + "github.com/hashicorp/go-memdb" +) + +type SliceLengthIndex struct { + Field string +} + +func (s *SliceLengthIndex) FromObject(obj interface{}) (bool, []byte, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Dereference the pointer if any + + fv := v.FieldByName(s.Field) + if !fv.IsValid() { + return false, nil, + fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj) + } + + // Check the type + k := fv.Kind() + if k != reflect.Slice { + return false, nil, fmt.Errorf("field %q is of type %v; want a slice", s.Field, k) + } + + // Get the slice length and encode it + val := fv.Len() + buf := encodeInt(int64(val), 8) + + return true, buf, nil +} + +func (s *SliceLengthIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + + v := reflect.ValueOf(args[0]) + if !v.IsValid() { + return nil, fmt.Errorf("%#v is invalid", args[0]) + } + + k := v.Kind() + _, ok := memdb.IsIntType(k) + if !ok { + return nil, fmt.Errorf("arg is of type %v; want an int", k) + } + + val := v.Int() + buf := encodeInt(val, 8) + + return buf, nil +} diff --git a/internal/state/state.go b/internal/state/state.go index c32ba9eaf..db1a67eaa 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -55,14 +55,15 @@ var dbSchema = &memdb.DBSchema{ Unique: true, Indexer: &StringerFieldIndexer{Field: "ID"}, }, - "priority_state": { - Name: "priority_state", + "priority_dependecies_state": { + Name: "priority_dependecies_state", Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &JobPriorityIndex{ PriorityIntField: "Priority", IsDirOpenBoolField: "IsDirOpen", }, + &SliceLengthIndex{Field: "DependsOn"}, &memdb.UintFieldIndex{Field: "State"}, }, }, @@ -103,6 +104,13 @@ var dbSchema = &memdb.DBSchema{ }, }, }, + "depends_on": { + Name: "depends_on", + Indexer: &JobIdSliceIndex{ + Field: "DependsOn", + }, + AllowMissing: true, + }, }, }, moduleTableName: { From 4de477d46bd6531ef4e9e531ba08c267819f4ae6 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Mon, 1 Aug 2022 16:32:12 +0100 Subject: [PATCH 2/5] indexer: Use DependsOn for (most) job dependencies --- internal/indexer/document_change.go | 127 ++++++++++---------- internal/indexer/module_calls.go | 179 ++++++++++++++-------------- internal/indexer/walker.go | 109 +++++++---------- internal/indexer/watcher.go | 10 +- 4 files changed, 205 insertions(+), 220 deletions(-) diff --git a/internal/indexer/document_change.go b/internal/indexer/document_change.go index af4bd37f5..b0840fb08 100644 --- a/internal/indexer/document_change.go +++ b/internal/indexer/document_change.go @@ -12,107 +12,108 @@ import ( func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, error) { ids := make(job.IDs, 0) - id, err := idx.jobStore.EnqueueJob(job.Job{ + parseId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleConfiguration(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseModuleConfiguration.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - return idx.decodeModule(ctx, modHandle) - }, }) if err != nil { return ids, err } - ids = append(ids, id) + ids = append(ids, parseId) + + modIds, err := idx.decodeModule(modHandle, job.IDs{parseId}) + if err != nil { + return ids, err + } + ids = append(ids, modIds...) - id, err = idx.jobStore.EnqueueJob(job.Job{ + parseVarsId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseVariables(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - ids := make(job.IDs, 0) - id, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeVarsReferences.String(), - }) - if err != nil { - return ids, err - } - ids = append(ids, id) - return ids, nil + }) + if err != nil { + return ids, err + } + ids = append(ids, parseVarsId) + + varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) }, + Type: op.OpTypeDecodeVarsReferences.String(), + DependsOn: job.IDs{parseVarsId}, }) if err != nil { return ids, err } - ids = append(ids, id) + ids = append(ids, varsRefsId) return ids, nil } -func (idx *Indexer) decodeModule(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { +func (idx *Indexer) decodeModule(modHandle document.DirHandle, dependsOn job.IDs) (job.IDs, error) { ids := make(job.IDs, 0) - id, err := idx.jobStore.EnqueueJob(job.Job{ + metaId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.LoadModuleMetadata(idx.modStore, modHandle.Path()) }, - Type: op.OpTypeLoadModuleMetadata.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - ids := make(job.IDs, 0) - id, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeReferenceTargets.String(), - }) - if err != nil { - return ids, err - } - ids = append(ids, id) + Type: op.OpTypeLoadModuleMetadata.String(), + DependsOn: dependsOn, + }) + if err != nil { + return ids, err + } + ids = append(ids, metaId) - id, err = idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeReferenceOrigins.String(), - }) - if err != nil { - return ids, err - } - ids = append(ids, id) + refTargetsId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) + }, + Type: op.OpTypeDecodeReferenceTargets.String(), + DependsOn: job.IDs{metaId}, + }) + if err != nil { + return ids, err + } + ids = append(ids, refTargetsId) - id, err = idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.GetModuleDataFromRegistry(ctx, idx.registryClient, - idx.modStore, idx.registryModStore, modHandle.Path()) - }, - Priority: job.LowPriority, - Type: op.OpTypeGetModuleDataFromRegistry.String(), - }) - if err != nil { - return ids, err - } + refOriginsId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) + }, + Type: op.OpTypeDecodeReferenceOrigins.String(), + DependsOn: job.IDs{metaId}, + }) + if err != nil { + return ids, err + } + ids = append(ids, refOriginsId) - ids = append(ids, id) - return ids, nil + registryId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.GetModuleDataFromRegistry(ctx, idx.registryClient, + idx.modStore, idx.registryModStore, modHandle.Path()) }, + Priority: job.LowPriority, + Type: op.OpTypeGetModuleDataFromRegistry.String(), }) if err != nil { return ids, err } - ids = append(ids, id) + + ids = append(ids, registryId) return ids, nil } diff --git a/internal/indexer/module_calls.go b/internal/indexer/module_calls.go index 83e544b32..31c5f610b 100644 --- a/internal/indexer/module_calls.go +++ b/internal/indexer/module_calls.go @@ -11,104 +11,101 @@ import ( op "github.com/hashicorp/terraform-ls/internal/terraform/module/operation" ) -func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job.DeferFunc { - return func(ctx context.Context, opErr error) (job.IDs, error) { - jobIds := make(job.IDs, 0) - if opErr != nil { - return jobIds, opErr +func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) (job.IDs, error) { + jobIds := make(job.IDs, 0) + + moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) + if err != nil { + return jobIds, err + } + + var errs *multierror.Error + + idx.logger.Printf("indexing installed module calls: %d", len(moduleCalls.Installed)) + for _, mc := range moduleCalls.Installed { + fi, err := os.Stat(mc.Path) + if err != nil || !fi.IsDir() { + multierror.Append(errs, err) + continue + } + err = idx.modStore.Add(mc.Path) + if err != nil { + multierror.Append(errs, err) + continue } - moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) + mcHandle := document.DirHandleFromPath(mc.Path) + // copy path for queued jobs below + mcPath := mc.Path + + parseId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.ParseModuleConfiguration(idx.fs, idx.modStore, mcPath) + }, + Type: op.OpTypeParseModuleConfiguration.String(), + }) if err != nil { - return jobIds, err + multierror.Append(errs, err) + continue + } + jobIds = append(jobIds, parseId) + + metaId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Type: op.OpTypeLoadModuleMetadata.String(), + Func: func(ctx context.Context) error { + return module.LoadModuleMetadata(idx.modStore, mcPath) + }, + DependsOn: job.IDs{parseId}, + }) + if err != nil { + multierror.Append(errs, err) + continue + } else { + jobIds = append(jobIds, metaId) } - var errs *multierror.Error - - for _, mc := range moduleCalls.Installed { - fi, err := os.Stat(mc.Path) - if err != nil || !fi.IsDir() { - multierror.Append(errs, err) - continue - } - err = idx.modStore.Add(mc.Path) - if err != nil { - multierror.Append(errs, err) - continue - } - - mcHandle := document.DirHandleFromPath(mc.Path) - // copy path for queued jobs below - mcPath := mc.Path - - id, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.ParseModuleConfiguration(idx.fs, idx.modStore, mcPath) - }, - Type: op.OpTypeParseModuleConfiguration.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - ids := make(job.IDs, 0) - - id, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Type: op.OpTypeLoadModuleMetadata.String(), - Func: func(ctx context.Context) error { - return module.LoadModuleMetadata(idx.modStore, mcPath) - }, - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - return idx.collectReferences(ctx, mcHandle) - }, - }) - if err != nil { - return ids, err - } else { - ids = append(ids, id) - } - - return ids, nil - }, - }) - if err != nil { - multierror.Append(errs, err) - continue - } - jobIds = append(jobIds, id) - - id, err = idx.jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.ParseVariables(idx.fs, idx.modStore, mcPath) - }, - Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - ids := make(job.IDs, 0) - id, err = idx.jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) - }, - Type: op.OpTypeDecodeVarsReferences.String(), - }) - if err != nil { - return ids, err - } - ids = append(ids, id) - return ids, err - }, - }) - if err != nil { - multierror.Append(errs, err) - continue - } - jobIds = append(jobIds, id) + ids, err := idx.collectReferences(mcHandle, job.IDs{parseId, metaId}) + if err != nil { + multierror.Append(errs, err) + continue + } else { + jobIds = append(jobIds, ids...) } - return jobIds, errs.ErrorOrNil() + varsParseId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.ParseVariables(idx.fs, idx.modStore, mcPath) + }, + Type: op.OpTypeParseVariables.String(), + }) + if err != nil { + multierror.Append(errs, err) + continue + } + jobIds = append(jobIds, varsParseId) + + varsRefId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) + }, + Type: op.OpTypeDecodeVarsReferences.String(), + DependsOn: job.IDs{varsParseId}, + }) + if err != nil { + multierror.Append(errs, err) + continue + } + ids = append(ids, varsRefId) } + + return jobIds, errs.ErrorOrNil() } -func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { +func (idx *Indexer) collectReferences(modHandle document.DirHandle, dependsOn job.IDs) (job.IDs, error) { ids := make(job.IDs, 0) var errs *multierror.Error @@ -118,7 +115,8 @@ func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.Di Func: func(ctx context.Context) error { return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) }, - Type: op.OpTypeDecodeReferenceTargets.String(), + Type: op.OpTypeDecodeReferenceTargets.String(), + DependsOn: dependsOn, }) if err != nil { errs = multierror.Append(errs, err) @@ -131,7 +129,8 @@ func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.Di Func: func(ctx context.Context) error { return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) }, - Type: op.OpTypeDecodeReferenceOrigins.String(), + Type: op.OpTypeDecodeReferenceOrigins.String(), + DependsOn: dependsOn, }) if err != nil { errs = multierror.Append(errs, err) diff --git a/internal/indexer/walker.go b/internal/indexer/walker.go index 354436ece..037f692ab 100644 --- a/internal/indexer/walker.go +++ b/internal/indexer/walker.go @@ -16,72 +16,60 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand ids := make(job.IDs, 0) var errs *multierror.Error - // blockingJobIds tracks job IDs which need to finish - // prior to collecting references - blockingJobIds := make(job.IDs, 0) - - id, err := idx.jobStore.EnqueueJob(job.Job{ + parseId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleConfiguration(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseModuleConfiguration.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - ids := make(job.IDs, 0) - - id, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Type: op.OpTypeLoadModuleMetadata.String(), - Func: func(ctx context.Context) error { - return module.LoadModuleMetadata(idx.modStore, modHandle.Path()) - }, - }) - if err != nil { - return ids, err - } else { - ids = append(ids, id) - } + }) + if err != nil { + errs = multierror.Append(errs, err) + } else { + ids = append(ids, parseId) + } - return ids, nil + metaId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Type: op.OpTypeLoadModuleMetadata.String(), + Func: func(ctx context.Context) error { + return module.LoadModuleMetadata(idx.modStore, modHandle.Path()) }, + DependsOn: job.IDs{parseId}, }) if err != nil { - errs = multierror.Append(errs, err) + return ids, err } else { - ids = append(ids, id) - blockingJobIds = append(blockingJobIds, id) + ids = append(ids, metaId) } - id, err = idx.jobStore.EnqueueJob(job.Job{ + parseVarsId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseVariables(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - ids := make(job.IDs, 0) - - id, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeVarsReferences.String(), - }) - if err != nil { - return ids, err - } - ids = append(ids, id) - return ids, err - }, }) if err != nil { errs = multierror.Append(errs, err) } else { - ids = append(ids, id) + ids = append(ids, parseVarsId) + } + + varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) + }, + Type: op.OpTypeDecodeVarsReferences.String(), + DependsOn: job.IDs{parseVarsId}, + }) + if err != nil { + return ids, err } + ids = append(ids, varsRefsId) - id, err = idx.jobStore.EnqueueJob(job.Job{ + tfVersionId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { ctx = exec.WithExecutorFactory(ctx, idx.tfExecFactory) @@ -92,15 +80,17 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand if err != nil { errs = multierror.Append(errs, err) } else { - ids = append(ids, id) - blockingJobIds = append(blockingJobIds, id) + ids = append(ids, tfVersionId) } dataDir := datadir.WalkDataDirOfModule(idx.fs, modHandle.Path()) idx.logger.Printf("parsed datadir: %#v", dataDir) + refCollectionDeps := job.IDs{ + parseId, metaId, tfVersionId, + } if dataDir.PluginLockFilePath != "" { - id, err := idx.jobStore.EnqueueJob(job.Job{ + pSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { ctx = exec.WithExecutorFactory(ctx, idx.tfExecFactory) @@ -111,40 +101,33 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand if err != nil { errs = multierror.Append(errs, err) } else { - ids = append(ids, id) - blockingJobIds = append(blockingJobIds, id) + ids = append(ids, pSchemaId) + refCollectionDeps = append(refCollectionDeps, pSchemaId) } } if dataDir.ModuleManifestPath != "" { // References are collected *after* manifest parsing // so that we reflect any references to submodules. - id, err := idx.jobStore.EnqueueJob(job.Job{ + modManifestId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleManifest(idx.fs, idx.modStore, modHandle.Path()) }, - Type: op.OpTypeParseModuleManifest.String(), - Defer: idx.decodeInstalledModuleCalls(modHandle), + Type: op.OpTypeParseModuleManifest.String(), + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + return idx.decodeInstalledModuleCalls(modHandle) + }, }) if err != nil { errs = multierror.Append(errs, err) } else { - ids = append(ids, id) - blockingJobIds = append(blockingJobIds, id) + ids = append(ids, modManifestId) + refCollectionDeps = append(refCollectionDeps, modManifestId) } } - // Here we wait for all dependent jobs to be processed to - // reflect any data required to collect reference origins. - // This assumes scheduler is running to consume the jobs - // by the time we reach this point. - err = idx.jobStore.WaitForJobs(ctx, blockingJobIds...) - if err != nil { - return ids, err - } - - rIds, err := idx.collectReferences(ctx, modHandle) + rIds, err := idx.collectReferences(modHandle, refCollectionDeps) if err != nil { errs = multierror.Append(errs, err) } else { diff --git a/internal/indexer/watcher.go b/internal/indexer/watcher.go index fc5f620ed..1e0ec202e 100644 --- a/internal/indexer/watcher.go +++ b/internal/indexer/watcher.go @@ -13,18 +13,20 @@ import ( func (idx *Indexer) ModuleManifestChanged(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { ids := make(job.IDs, 0) - id, err := idx.jobStore.EnqueueJob(job.Job{ + modManifestId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleManifest(idx.fs, idx.modStore, modHandle.Path()) }, - Type: op.OpTypeParseModuleManifest.String(), - Defer: idx.decodeInstalledModuleCalls(modHandle), + Type: op.OpTypeParseModuleManifest.String(), + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + return idx.decodeInstalledModuleCalls(modHandle) + }, }) if err != nil { return ids, err } - ids = append(ids, id) + ids = append(ids, modManifestId) return ids, nil } From 458dd5414effabc0b51bc5da4d5786f14528e352 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Mon, 1 Aug 2022 20:34:34 +0100 Subject: [PATCH 3/5] fix: check module existence & add in the same txn This is to prevent a race condition --- internal/state/module.go | 27 +++++++++++++++++++++------ internal/walker/walker.go | 13 +++---------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/internal/state/module.go b/internal/state/module.go index 6c605530b..7f0158178 100644 --- a/internal/state/module.go +++ b/internal/state/module.go @@ -230,6 +230,16 @@ func (s *ModuleStore) Add(modPath string) error { txn := s.db.Txn(true) defer txn.Abort() + err := s.add(txn, modPath) + if err != nil { + return err + } + txn.Commit() + + return nil +} + +func (s *ModuleStore) add(txn *memdb.Txn, modPath string) error { // TODO: Introduce Exists method to Txn? obj, err := txn.First(s.tableName, "id", modPath) if err != nil { @@ -252,7 +262,6 @@ func (s *ModuleStore) Add(modPath string) error { return err } - txn.Commit() return nil } @@ -318,19 +327,25 @@ func (s *ModuleStore) ModuleByPath(path string) (*Module, error) { return mod, nil } -func (s *ModuleStore) Exists(path string) (bool, error) { - txn := s.db.Txn(false) +func (s *ModuleStore) AddIfNotExists(path string) error { + txn := s.db.Txn(true) + defer txn.Abort() _, err := moduleByPath(txn, path) if err != nil { if IsModuleNotFound(err) { - return false, nil + err := s.add(txn, path) + if err != nil { + return err + } + txn.Commit() + return nil } - return false, err + return err } - return true, nil + return nil } func (s *ModuleStore) ModuleCalls(modPath string) (tfmod.ModuleCalls, error) { diff --git a/internal/walker/walker.go b/internal/walker/walker.go index 31e3bc6a6..b77d1cff3 100644 --- a/internal/walker/walker.go +++ b/internal/walker/walker.go @@ -56,8 +56,7 @@ type PathStore interface { } type ModuleStore interface { - Exists(dir string) (bool, error) - Add(dir string) error + AddIfNotExists(dir string) error } func NewWalker(fs fs.ReadDirFS, pathStore PathStore, modStore ModuleStore, walkFunc WalkFunc) *Walker { @@ -190,20 +189,14 @@ func (w *Walker) walk(ctx context.Context, dir document.DirHandle) error { dirIndexed = true w.logger.Printf("found module %s", dir) - exists, err := w.modStore.Exists(dir.Path()) + err := w.modStore.AddIfNotExists(dir.Path()) if err != nil { return err } - if !exists { - err := w.modStore.Add(dir.Path()) - if err != nil { - return err - } - } ids, err := w.walkFunc(ctx, dir) if err != nil { - w.collectError(err) + w.collectError(fmt.Errorf("walkFunc: %w", err)) } w.collectJobIds(ids) continue From c4a8f77e79d19fe1855e8c99607c06c52266c57f Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Wed, 3 Aug 2022 15:08:56 +0100 Subject: [PATCH 4/5] fix: account for empty job IDs & wrong ID in cleanup --- internal/indexer/module_calls.go | 80 ++++++++++++++++++-------------- internal/indexer/walker.go | 73 ++++++++++++++++------------- internal/state/jobs.go | 2 +- internal/state/jobs_test.go | 67 ++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 67 deletions(-) diff --git a/internal/indexer/module_calls.go b/internal/indexer/module_calls.go index 31c5f610b..5540b90a2 100644 --- a/internal/indexer/module_calls.go +++ b/internal/indexer/module_calls.go @@ -38,6 +38,8 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) (jo // copy path for queued jobs below mcPath := mc.Path + refCollectionDeps := make(job.IDs, 0) + parseId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { @@ -47,31 +49,36 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) (jo }) if err != nil { multierror.Append(errs, err) - continue + } else { + jobIds = append(jobIds, parseId) + refCollectionDeps = append(refCollectionDeps, parseId) } - jobIds = append(jobIds, parseId) - metaId, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Type: op.OpTypeLoadModuleMetadata.String(), - Func: func(ctx context.Context) error { - return module.LoadModuleMetadata(idx.modStore, mcPath) - }, - DependsOn: job.IDs{parseId}, - }) - if err != nil { - multierror.Append(errs, err) - continue - } else { - jobIds = append(jobIds, metaId) + var metaId job.ID + if parseId != "" { + metaId, err = idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Type: op.OpTypeLoadModuleMetadata.String(), + Func: func(ctx context.Context) error { + return module.LoadModuleMetadata(idx.modStore, mcPath) + }, + DependsOn: job.IDs{parseId}, + }) + if err != nil { + multierror.Append(errs, err) + } else { + jobIds = append(jobIds, metaId) + refCollectionDeps = append(refCollectionDeps, metaId) + } } - ids, err := idx.collectReferences(mcHandle, job.IDs{parseId, metaId}) - if err != nil { - multierror.Append(errs, err) - continue - } else { - jobIds = append(jobIds, ids...) + if parseId != "" { + ids, err := idx.collectReferences(mcHandle, refCollectionDeps) + if err != nil { + multierror.Append(errs, err) + } else { + jobIds = append(jobIds, ids...) + } } varsParseId, err := idx.jobStore.EnqueueJob(job.Job{ @@ -83,23 +90,26 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) (jo }) if err != nil { multierror.Append(errs, err) - continue + } else { + jobIds = append(jobIds, varsParseId) } - jobIds = append(jobIds, varsParseId) - varsRefId, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) - }, - Type: op.OpTypeDecodeVarsReferences.String(), - DependsOn: job.IDs{varsParseId}, - }) - if err != nil { - multierror.Append(errs, err) - continue + if varsParseId != "" { + varsRefId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) + }, + Type: op.OpTypeDecodeVarsReferences.String(), + DependsOn: job.IDs{varsParseId}, + }) + if err != nil { + multierror.Append(errs, err) + continue + } else { + jobIds = append(jobIds, varsRefId) + } } - ids = append(ids, varsRefId) } return jobIds, errs.ErrorOrNil() diff --git a/internal/indexer/walker.go b/internal/indexer/walker.go index 037f692ab..416bb12e3 100644 --- a/internal/indexer/walker.go +++ b/internal/indexer/walker.go @@ -16,6 +16,8 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand ids := make(job.IDs, 0) var errs *multierror.Error + refCollectionDeps := make(job.IDs, 0) + parseId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { @@ -27,20 +29,25 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand errs = multierror.Append(errs, err) } else { ids = append(ids, parseId) + refCollectionDeps = append(refCollectionDeps, parseId) } - metaId, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Type: op.OpTypeLoadModuleMetadata.String(), - Func: func(ctx context.Context) error { - return module.LoadModuleMetadata(idx.modStore, modHandle.Path()) - }, - DependsOn: job.IDs{parseId}, - }) - if err != nil { - return ids, err - } else { - ids = append(ids, metaId) + var metaId job.ID + if parseId != "" { + metaId, err = idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Type: op.OpTypeLoadModuleMetadata.String(), + Func: func(ctx context.Context) error { + return module.LoadModuleMetadata(idx.modStore, modHandle.Path()) + }, + DependsOn: job.IDs{parseId}, + }) + if err != nil { + return ids, err + } else { + ids = append(ids, metaId) + refCollectionDeps = append(refCollectionDeps, metaId) + } } parseVarsId, err := idx.jobStore.EnqueueJob(job.Job{ @@ -56,18 +63,22 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand ids = append(ids, parseVarsId) } - varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeVarsReferences.String(), - DependsOn: job.IDs{parseVarsId}, - }) - if err != nil { - return ids, err + if parseVarsId != "" { + varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) + }, + Type: op.OpTypeDecodeVarsReferences.String(), + DependsOn: job.IDs{parseVarsId}, + }) + if err != nil { + return ids, err + } else { + ids = append(ids, varsRefsId) + refCollectionDeps = append(refCollectionDeps, varsRefsId) + } } - ids = append(ids, varsRefsId) tfVersionId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, @@ -81,14 +92,12 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand errs = multierror.Append(errs, err) } else { ids = append(ids, tfVersionId) + refCollectionDeps = append(refCollectionDeps, tfVersionId) } dataDir := datadir.WalkDataDirOfModule(idx.fs, modHandle.Path()) idx.logger.Printf("parsed datadir: %#v", dataDir) - refCollectionDeps := job.IDs{ - parseId, metaId, tfVersionId, - } if dataDir.PluginLockFilePath != "" { pSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, @@ -127,11 +136,13 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand } } - rIds, err := idx.collectReferences(modHandle, refCollectionDeps) - if err != nil { - errs = multierror.Append(errs, err) - } else { - ids = append(ids, rIds...) + if parseId != "" { + rIds, err := idx.collectReferences(modHandle, refCollectionDeps) + if err != nil { + errs = multierror.Append(errs, err) + } else { + ids = append(ids, rIds...) + } } return ids, errs.ErrorOrNil() diff --git a/internal/state/jobs.go b/internal/state/jobs.go index 3de15e40c..d8159a636 100644 --- a/internal/state/jobs.go +++ b/internal/state/jobs.go @@ -447,7 +447,7 @@ func (js *JobStore) removeJobFromDependsOn(txn *memdb.Txn, id job.ID) error { jobCopy.DependsOn = jobCopy.DependsOn[:len(jobCopy.DependsOn)-1] // re-insert updated data - _, err := txn.DeleteAll(js.tableName, "id", id) + _, err := txn.DeleteAll(js.tableName, "id", jobCopy.ID) if err != nil { return err } diff --git a/internal/state/jobs_test.go b/internal/state/jobs_test.go index 424686883..2847a25f4 100644 --- a/internal/state/jobs_test.go +++ b/internal/state/jobs_test.go @@ -702,3 +702,70 @@ func TestJobStore_FinishJob_defer(t *testing.T) { t.Fatalf("unexpected jobs: %s", diff) } } + +func TestJobStore_FinishJob_dependsOn(t *testing.T) { + ss, err := NewStateStore() + if err != nil { + t.Fatal(err) + } + + parentId, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: document.DirHandleFromPath(t.TempDir()), + Type: "parent-job", + }) + if err != nil { + t.Fatal(err) + } + + childId, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: document.DirHandleFromPath(t.TempDir()), + Type: "child-job", + DependsOn: job.IDs{parentId}, + }) + if err != nil { + t.Fatal(err) + } + + ids, err := ss.JobStore.ListQueuedJobs() + if err != nil { + t.Fatal(err) + } + expectedIds := job.IDs{parentId, childId} + if diff := cmp.Diff(expectedIds, ids); diff != "" { + t.Fatalf("unexpected IDs: %s", diff) + } + + err = ss.JobStore.FinishJob(parentId, nil) + if err != nil { + t.Fatal(err) + } + + ids, err = ss.JobStore.ListQueuedJobs() + if err != nil { + t.Fatal(err) + } + expectedIds = job.IDs{childId} + if diff := cmp.Diff(expectedIds, ids); diff != "" { + t.Fatalf("unexpected IDs after finishing: %s", diff) + } + + ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancelFunc) + nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.LowPriority) + if err != nil { + t.Fatal(err) + } + if nextId != childId { + t.Fatalf("expected next ID %q, given %q", childId, nextId) + } + expectedDependsOn := job.IDs{} + if diff := cmp.Diff(expectedDependsOn, j.DependsOn); diff != "" { + t.Fatalf("unexpected DependsOn: %s", diff) + } +} From 3768eb7d31336d9b6223967bbcf4537c477c8e8f Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Wed, 3 Aug 2022 20:15:55 +0100 Subject: [PATCH 5/5] Update internal/indexer/module_calls.go Co-authored-by: Daniel Banck --- internal/indexer/module_calls.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/indexer/module_calls.go b/internal/indexer/module_calls.go index 5540b90a2..6537b210b 100644 --- a/internal/indexer/module_calls.go +++ b/internal/indexer/module_calls.go @@ -105,7 +105,6 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) (jo }) if err != nil { multierror.Append(errs, err) - continue } else { jobIds = append(jobIds, varsRefId) }