Skip to content

Commit

Permalink
fix races in jobqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
SaveTheRbtz committed Apr 12, 2022
1 parent c738803 commit c0adb2f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
4 changes: 3 additions & 1 deletion module/jobqueue/consumer_behavior_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func newTestConsumer(cp storage.ConsumerProgress, jobs module.Jobs, worker jobqu

// a Mock worker that stores all the jobs that it was asked to work on
type mockWorker struct {
sync.Mutex
sync.RWMutex
log zerolog.Logger
called []Job
fn func(job Job)
Expand Down Expand Up @@ -553,11 +553,13 @@ func (w *mockWorker) Run(job Job) error {
// return the IDs of the jobs
func (w *mockWorker) AssertCalled(t *testing.T, expectCalled []int64) {
called := make([]int, 0)
w.RLock()
for _, c := range w.called {
jobID, err := strconv.Atoi(string(c.ID()))
require.NoError(t, err)
called = append(called, jobID)
}
w.RUnlock()
sort.Ints(called)

called64 := make([]int64, 0)
Expand Down
4 changes: 4 additions & 0 deletions module/jobqueue/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,14 @@ func TestProcessedIndexDeletion(t *testing.T) {
require.NoError(t, c.Start(0))

require.Eventually(t, func() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.processedIndex == uint64(10)
}, 2*time.Second, 10*time.Millisecond)

// should have no processing after all jobs are processed
c.mu.Lock()
defer c.mu.Unlock()
require.Len(t, c.processings, 0)
require.Len(t, c.processingsIndex, 0)
})
Expand Down

0 comments on commit c0adb2f

Please sign in to comment.