Skip to content

Commit

Permalink
internal/scheduler: add test to exercise both open & closed dirs toge…
Browse files Browse the repository at this point in the history
…ther

Multiple runs may be necessary to trigger the bug, 10 seems to be enough to do so in my environment:

go test ./internal/scheduler -run=TestScheduler_closedAndOpen -count=10 -timeout=3s
  • Loading branch information
radeksimko committed Mar 3, 2022
1 parent bff4e90 commit dfd54ca
Showing 1 changed file with 145 additions and 2 deletions.
147 changes: 145 additions & 2 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"

Expand All @@ -15,7 +16,7 @@ import (
"github.com/hashicorp/terraform-ls/internal/state"
)

func TestScheduler_basic(t *testing.T) {
func TestScheduler_closedOnly(t *testing.T) {
ss, err := state.NewStateStore()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -65,7 +66,129 @@ func TestScheduler_basic(t *testing.T) {
}
}

func BenchmarkScheduler_EnqueueAndWaitForJob(b *testing.B) {
func TestScheduler_closedAndOpen(t *testing.T) {
ss, err := state.NewStateStore()
if err != nil {
t.Fatal(err)
}
ss.SetLogger(testLogger())

tmpDir := t.TempDir()

var wg sync.WaitGroup

var closedJobsExecuted int64 = 0
closedJobsToExecute := 50
closedIds := make([]job.ID, 0)
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
for i := 0; i < closedJobsToExecute; i++ {
i := i
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-x-%d", i))

newId, err := ss.JobStore.EnqueueJob(job.Job{
Func: func(c context.Context) error {
atomic.AddInt64(&closedJobsExecuted, 1)
return nil
},
Dir: document.DirHandleFromPath(dirPath),
Type: "test-type",
})
if err != nil {
t.Error(err)
}
closedIds = append(closedIds, newId)
}
}(t)

openJobsToExecute := 50
var openJobsExecuted int64 = 0
openIds := make([]job.ID, 0)
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
for i := 0; i < openJobsToExecute; i++ {
i := i
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i))

newId, err := ss.JobStore.EnqueueJob(job.Job{
Func: func(c context.Context) error {
atomic.AddInt64(&openJobsExecuted, 1)
return nil
},
Dir: document.DirHandleFromPath(dirPath),
Type: "test-type",
})
if err != nil {
t.Error(err)
}

openIds = append(openIds, newId)
}
}(t)

wg.Add(1)
// we intentionally open the documents in a separate routine,
// possibly after some of the relevant jobs have been queued (as closed)
// to better reflect what may happen in reality
go func(t *testing.T) {
defer wg.Done()
for i := 0; i < openJobsToExecute; i++ {
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i))
dh := document.HandleFromPath(filepath.Join(dirPath, "test.tf"))
err := ss.DocumentStore.OpenDocument(dh, "", 0, []byte{})
if err != nil {
t.Error(err)
}
}
}(t)

ctx := context.Background()
if deadline, ok := t.Deadline(); ok {
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithDeadline(ctx, deadline)
t.Cleanup(cancelFunc)
}

cs := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1)
cs.SetLogger(testLogger())
cs.Start(ctx)
t.Cleanup(func() {
cs.Stop()
})

os := NewScheduler(&openDirJobs{js: ss.JobStore}, 1)
os.SetLogger(testLogger())
os.Start(ctx)
t.Cleanup(func() {
os.Stop()
})

// wait for all scheduling and document opening to finish
wg.Wait()
t.Log("finished all scheduling and doc opening")

allIds := make([]job.ID, 0)
allIds = append(allIds, closedIds...)
allIds = append(allIds, openIds...)

t.Logf("waiting for %d jobs", len(allIds))
err = ss.JobStore.WaitForJobs(ctx, allIds...)
if err != nil {
t.Fatal(err)
}

if closedJobsExecuted != int64(closedJobsToExecute) {
t.Fatalf("expected %d closed jobs to execute, got: %d", closedJobsToExecute, closedJobsExecuted)
}

if openJobsExecuted != int64(openJobsToExecute) {
t.Fatalf("expected %d open jobs to execute, got: %d", openJobsToExecute, openJobsExecuted)
}
}

func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) {
ss, err := state.NewStateStore()
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -223,3 +346,23 @@ func (js *closedDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...jo
func (js *closedDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}

type openDirJobs struct {
js *state.JobStore
}

func (js *openDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *openDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, true)
}

func (js *openDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *openDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}

0 comments on commit dfd54ca

Please sign in to comment.