Skip to content

Commit

Permalink
drain tasks in an unblocked way to avoid data race
Browse files Browse the repository at this point in the history
  • Loading branch information
hongkuancn committed Sep 1, 2024
1 parent 5f162d4 commit 44b1bcc
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
5 changes: 3 additions & 2 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,14 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
// Terminate all workers & purger goroutine
p.contextCancel()

// Wait for all workers & purger goroutine to exit
p.workersWaitGroup.Wait()

// close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made)
p.tasksCloseOnce.Do(func() {
close(p.tasks)
})

// Wait for all workers & purger goroutine to exit
p.workersWaitGroup.Wait()
}

// purge represents the work done by the purger goroutine
Expand Down
22 changes: 22 additions & 0 deletions pond_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,28 @@ func TestPoolWithCustomIdleTimeout(t *testing.T) {
pool.StopAndWait()
}

func TestStopWithPurging(t *testing.T) {

pool := pond.New(5, 5, pond.IdleTimeout(100*time.Millisecond))

// Submit a task
for i := 0; i < 5; i++ {
pool.Submit(func() {
time.Sleep(10 * time.Millisecond)
})
}

assertEqual(t, 5, pool.RunningWorkers())

// Purge goroutine is clearing idle workers
time.Sleep(200 * time.Millisecond)

// Stop the pool to make sure there is no data race with purge goroutine
pool.StopAndWait()

assertEqual(t, 0, pool.RunningWorkers())
}

func TestPoolWithCustomPanicHandler(t *testing.T) {

var capturedPanic interface{} = nil
Expand Down
11 changes: 9 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func()

// drainPendingTasks discards queued tasks and decrements the corresponding wait group
func drainTasks(tasks <-chan func(), tasksWaitGroup *sync.WaitGroup) {
for _ = range tasks {
tasksWaitGroup.Done()
for {
select {
case task, ok := <-tasks:
if task != nil && ok {
tasksWaitGroup.Done()
}
default:
return
}
}
}

0 comments on commit 44b1bcc

Please sign in to comment.