diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 7870abbda..ba4c39371 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "sync" "sync/atomic" "testing" @@ -15,7 +16,7 @@ import ( "github.com/hashicorp/terraform-ls/internal/state" ) -func TestScheduler_basic(t *testing.T) { +func TestScheduler_closedOnly(t *testing.T) { ss, err := state.NewStateStore() if err != nil { t.Fatal(err) @@ -65,7 +66,129 @@ func TestScheduler_basic(t *testing.T) { } } -func BenchmarkScheduler_EnqueueAndWaitForJob(b *testing.B) { +func TestScheduler_closedAndOpen(t *testing.T) { + ss, err := state.NewStateStore() + if err != nil { + t.Fatal(err) + } + ss.SetLogger(testLogger()) + + tmpDir := t.TempDir() + + var wg sync.WaitGroup + + var closedJobsExecuted int64 = 0 + closedJobsToExecute := 50 + closedIds := make([]job.ID, 0) + wg.Add(1) + go func(t *testing.T) { + defer wg.Done() + for i := 0; i < closedJobsToExecute; i++ { + i := i + dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-x-%d", i)) + + newId, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(c context.Context) error { + atomic.AddInt64(&closedJobsExecuted, 1) + return nil + }, + Dir: document.DirHandleFromPath(dirPath), + Type: "test-type", + }) + if err != nil { + t.Error(err) + } + closedIds = append(closedIds, newId) + } + }(t) + + openJobsToExecute := 50 + var openJobsExecuted int64 = 0 + openIds := make([]job.ID, 0) + wg.Add(1) + go func(t *testing.T) { + defer wg.Done() + for i := 0; i < openJobsToExecute; i++ { + i := i + dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i)) + + newId, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(c context.Context) error { + atomic.AddInt64(&openJobsExecuted, 1) + return nil + }, + Dir: document.DirHandleFromPath(dirPath), + Type: "test-type", + }) + if err != nil { + t.Error(err) + } + + openIds = append(openIds, newId) + } + }(t) + + wg.Add(1) + // we intentionally open the documents in a separate routine, + // possibly after some of the relevant jobs have been queued (as closed) + // to better reflect what may happen in reality + go func(t *testing.T) { + defer wg.Done() + for i := 0; i < openJobsToExecute; i++ { + dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i)) + dh := document.HandleFromPath(filepath.Join(dirPath, "test.tf")) + err := ss.DocumentStore.OpenDocument(dh, "", 0, []byte{}) + if err != nil { + t.Error(err) + } + } + }(t) + + ctx := context.Background() + if deadline, ok := t.Deadline(); ok { + var cancelFunc context.CancelFunc + ctx, cancelFunc = context.WithDeadline(ctx, deadline) + t.Cleanup(cancelFunc) + } + + cs := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1) + cs.SetLogger(testLogger()) + cs.Start(ctx) + t.Cleanup(func() { + cs.Stop() + }) + + os := NewScheduler(&openDirJobs{js: ss.JobStore}, 1) + os.SetLogger(testLogger()) + os.Start(ctx) + t.Cleanup(func() { + os.Stop() + }) + + // wait for all scheduling and document opening to finish + wg.Wait() + t.Log("finished all scheduling and doc opening") + + allIds := make([]job.ID, 0) + allIds = append(allIds, closedIds...) + allIds = append(allIds, openIds...) + + t.Logf("waiting for %d jobs", len(allIds)) + err = ss.JobStore.WaitForJobs(ctx, allIds...) + if err != nil { + t.Fatal(err) + } + + if closedJobsExecuted != int64(closedJobsToExecute) { + t.Fatalf("expected %d closed jobs to execute, got: %d", closedJobsToExecute, closedJobsExecuted) + } + + if openJobsExecuted != int64(openJobsToExecute) { + t.Fatalf("expected %d open jobs to execute, got: %d", openJobsToExecute, openJobsExecuted) + } +} + +func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) { ss, err := state.NewStateStore() if err != nil { b.Fatal(err) @@ -223,3 +346,23 @@ func (js *closedDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...jo func (js *closedDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { return js.js.WaitForJobs(ctx, jobIds...) } + +type openDirJobs struct { + js *state.JobStore +} + +func (js *openDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) { + return js.js.EnqueueJob(newJob) +} + +func (js *openDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) { + return js.js.AwaitNextJob(ctx, true) +} + +func (js *openDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error { + return js.js.FinishJob(id, jobErr, deferredJobIds...) +} + +func (js *openDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { + return js.js.WaitForJobs(ctx, jobIds...) +} diff --git a/internal/state/errors.go b/internal/state/errors.go index ebc7d5d9b..792f40d61 100644 --- a/internal/state/errors.go +++ b/internal/state/errors.go @@ -1,6 +1,10 @@ package state -import "fmt" +import ( + "fmt" + + "github.com/hashicorp/terraform-ls/internal/job" +) type AlreadyExistsError struct { Idx string @@ -39,3 +43,25 @@ func IsModuleNotFound(err error) bool { _, ok := err.(*ModuleNotFoundError) return ok } + +type jobAlreadyRunning struct { + ID job.ID +} + +func (e jobAlreadyRunning) Error() string { + if e.ID != "" { + return fmt.Sprintf("job %q is already running", e.ID) + } + return "job is already running" +} + +type jobNotFound struct { + ID job.ID +} + +func (e jobNotFound) Error() string { + if e.ID != "" { + return fmt.Sprintf("job %q not found", e.ID) + } + return "job not found" +} diff --git a/internal/state/jobs.go b/internal/state/jobs.go index 894e3c451..a54acb484 100644 --- a/internal/state/jobs.go +++ b/internal/state/jobs.go @@ -2,6 +2,7 @@ package state import ( "context" + "errors" "fmt" "log" "sync" @@ -207,10 +208,20 @@ func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job err = js.markJobAsRunning(sJob) if err != nil { + // Although we hold a write db-wide lock when marking job as running + // we may still end up passing the same job from the above read-only + // transaction, which does *not* hold a db-wide lock. + // + // Instead of adding more sync primitives here we simply retry. + if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) { + js.logger.Printf("retrying next job: %s", err) + return js.awaitNextJob(ctx, openDir) + } + return "", job.Job{}, err } - js.logger.Printf("JOBS: Dispatching next job: %q for %q", sJob.Type, sJob.Dir) + js.logger.Printf("JOBS: Dispatching next job %q: %q for %q", sJob.ID, sJob.Type, sJob.Dir) return sJob.ID, sJob.Job, nil } @@ -294,6 +305,10 @@ func (js *JobStore) markJobAsRunning(sJob *ScheduledJob) error { return err } + if sj.State == StateRunning { + return jobAlreadyRunning{ID: sJob.ID} + } + _, err = txn.DeleteAll(js.tableName, "id", sJob.ID) if err != nil { return err @@ -458,5 +473,5 @@ func copyJob(txn *memdb.Txn, id job.ID) (*ScheduledJob, error) { sj := obj.(*ScheduledJob) return sj.Copy(), nil } - return nil, fmt.Errorf("%q: job not found", id) + return nil, jobNotFound{ID: id} }