Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/state: avoid deadlock caused by mid-flight job change #818

Merged
merged 2 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 145 additions & 2 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"

Expand All @@ -15,7 +16,7 @@ import (
"github.com/hashicorp/terraform-ls/internal/state"
)

func TestScheduler_basic(t *testing.T) {
func TestScheduler_closedOnly(t *testing.T) {
ss, err := state.NewStateStore()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -65,7 +66,129 @@ func TestScheduler_basic(t *testing.T) {
}
}

func BenchmarkScheduler_EnqueueAndWaitForJob(b *testing.B) {
func TestScheduler_closedAndOpen(t *testing.T) {
ss, err := state.NewStateStore()
if err != nil {
t.Fatal(err)
}
ss.SetLogger(testLogger())

tmpDir := t.TempDir()

var wg sync.WaitGroup

var closedJobsExecuted int64 = 0
closedJobsToExecute := 50
closedIds := make([]job.ID, 0)
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
for i := 0; i < closedJobsToExecute; i++ {
i := i
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-x-%d", i))

newId, err := ss.JobStore.EnqueueJob(job.Job{
Func: func(c context.Context) error {
atomic.AddInt64(&closedJobsExecuted, 1)
return nil
},
Dir: document.DirHandleFromPath(dirPath),
Type: "test-type",
})
if err != nil {
t.Error(err)
}
closedIds = append(closedIds, newId)
}
}(t)

openJobsToExecute := 50
var openJobsExecuted int64 = 0
openIds := make([]job.ID, 0)
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
for i := 0; i < openJobsToExecute; i++ {
i := i
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i))

newId, err := ss.JobStore.EnqueueJob(job.Job{
Func: func(c context.Context) error {
atomic.AddInt64(&openJobsExecuted, 1)
return nil
},
Dir: document.DirHandleFromPath(dirPath),
Type: "test-type",
})
if err != nil {
t.Error(err)
}

openIds = append(openIds, newId)
}
}(t)

wg.Add(1)
// we intentionally open the documents in a separate routine,
// possibly after some of the relevant jobs have been queued (as closed)
// to better reflect what may happen in reality
go func(t *testing.T) {
defer wg.Done()
for i := 0; i < openJobsToExecute; i++ {
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i))
dh := document.HandleFromPath(filepath.Join(dirPath, "test.tf"))
err := ss.DocumentStore.OpenDocument(dh, "", 0, []byte{})
if err != nil {
t.Error(err)
}
}
}(t)

ctx := context.Background()
if deadline, ok := t.Deadline(); ok {
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithDeadline(ctx, deadline)
t.Cleanup(cancelFunc)
}

cs := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1)
cs.SetLogger(testLogger())
cs.Start(ctx)
t.Cleanup(func() {
cs.Stop()
})

os := NewScheduler(&openDirJobs{js: ss.JobStore}, 1)
os.SetLogger(testLogger())
os.Start(ctx)
t.Cleanup(func() {
os.Stop()
})

// wait for all scheduling and document opening to finish
wg.Wait()
t.Log("finished all scheduling and doc opening")

allIds := make([]job.ID, 0)
allIds = append(allIds, closedIds...)
allIds = append(allIds, openIds...)

t.Logf("waiting for %d jobs", len(allIds))
err = ss.JobStore.WaitForJobs(ctx, allIds...)
if err != nil {
t.Fatal(err)
}

if closedJobsExecuted != int64(closedJobsToExecute) {
t.Fatalf("expected %d closed jobs to execute, got: %d", closedJobsToExecute, closedJobsExecuted)
}

if openJobsExecuted != int64(openJobsToExecute) {
t.Fatalf("expected %d open jobs to execute, got: %d", openJobsToExecute, openJobsExecuted)
}
}

func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) {
ss, err := state.NewStateStore()
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -223,3 +346,23 @@ func (js *closedDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...jo
func (js *closedDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}

type openDirJobs struct {
js *state.JobStore
}

func (js *openDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *openDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, true)
}

func (js *openDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *openDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}
28 changes: 27 additions & 1 deletion internal/state/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package state

import "fmt"
import (
"fmt"

"github.com/hashicorp/terraform-ls/internal/job"
)

type AlreadyExistsError struct {
Idx string
Expand Down Expand Up @@ -39,3 +43,25 @@ func IsModuleNotFound(err error) bool {
_, ok := err.(*ModuleNotFoundError)
return ok
}

type jobAlreadyRunning struct {
ID job.ID
}

func (e jobAlreadyRunning) Error() string {
if e.ID != "" {
return fmt.Sprintf("job %q is already running", e.ID)
}
return "job is already running"
}

type jobNotFound struct {
ID job.ID
}

func (e jobNotFound) Error() string {
if e.ID != "" {
return fmt.Sprintf("job %q not found", e.ID)
}
return "job not found"
}
19 changes: 17 additions & 2 deletions internal/state/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"context"
"errors"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -207,10 +208,20 @@ func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job

err = js.markJobAsRunning(sJob)
if err != nil {
// Although we hold a write db-wide lock when marking job as running
// we may still end up passing the same job from the above read-only
// transaction, which does *not* hold a db-wide lock.
//
// Instead of adding more sync primitives here we simply retry.
if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) {
js.logger.Printf("retrying next job: %s", err)
return js.awaitNextJob(ctx, openDir)
}

return "", job.Job{}, err
}

js.logger.Printf("JOBS: Dispatching next job: %q for %q", sJob.Type, sJob.Dir)
js.logger.Printf("JOBS: Dispatching next job %q: %q for %q", sJob.ID, sJob.Type, sJob.Dir)
return sJob.ID, sJob.Job, nil
}

Expand Down Expand Up @@ -294,6 +305,10 @@ func (js *JobStore) markJobAsRunning(sJob *ScheduledJob) error {
return err
}

if sj.State == StateRunning {
return jobAlreadyRunning{ID: sJob.ID}
}

_, err = txn.DeleteAll(js.tableName, "id", sJob.ID)
if err != nil {
return err
Expand Down Expand Up @@ -458,5 +473,5 @@ func copyJob(txn *memdb.Txn, id job.ID) (*ScheduledJob, error) {
sj := obj.(*ScheduledJob)
return sj.Copy(), nil
}
return nil, fmt.Errorf("%q: job not found", id)
return nil, jobNotFound{ID: id}
}