diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index 147a4f335e101..e783e3957b1ec 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -60,6 +60,9 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh full = true } + // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" + // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later + // So ideally, it should check whether there are enough workers periodically, and start new workers if necessary. q.workerNumMu.Lock() noWorker := q.workerNum == 0 if full || noWorker { @@ -143,7 +146,11 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { log.Debug("Queue %q starts new worker", q.GetName()) defer log.Debug("Queue %q stops idle worker", q.GetName()) + atomic.AddInt32(&q.workerStartedCounter, 1) + t := time.NewTicker(workerIdleDuration) + defer t.Stop() + keepWorking := true stopWorking := func() { q.workerNumMu.Lock() @@ -158,13 +165,18 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { case batch, ok := <-q.batchChan: if !ok { stopWorking() - } else { - q.doWorkerHandle(batch) - t.Reset(workerIdleDuration) + continue + } + q.doWorkerHandle(batch) + // reset the idle ticker, and drain the tick after reset in case a tick is already triggered + t.Reset(workerIdleDuration) + select { + case <-t.C: + default: } case <-t.C: q.workerNumMu.Lock() - keepWorking = q.workerNum <= 1 + keepWorking = q.workerNum <= 1 // keep the last worker running if !keepWorking { q.workerNum-- } diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index b28fd880270ab..4160622d81388 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -40,6 +40,8 @@ type WorkerPoolQueue[T any] struct { workerMaxNum int workerActiveNum int workerNumMu sync.Mutex + + workerStartedCounter int32 } type flushType chan struct{} diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go index e60120162a706..e09669c54255e 100644 --- a/modules/queue/workerqueue_test.go +++ b/modules/queue/workerqueue_test.go @@ -11,6 +11,7 @@ import ( "time" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/test" "github.com/stretchr/testify/assert" ) @@ -175,11 +176,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett } func TestWorkerPoolQueueActiveWorkers(t *testing.T) { - oldWorkerIdleDuration := workerIdleDuration - workerIdleDuration = 300 * time.Millisecond - defer func() { - workerIdleDuration = oldWorkerIdleDuration - }() + defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)() handler := func(items ...int) (unhandled []int) { time.Sleep(100 * time.Millisecond) @@ -250,3 +247,25 @@ func TestWorkerPoolQueueShutdown(t *testing.T) { q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) assert.EqualValues(t, 20, q.GetQueueItemNumber()) } + +func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { + defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)() + + handler := func(items ...int) (unhandled []int) { + time.Sleep(50 * time.Millisecond) + return nil + } + + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) + stop := runWorkerPoolQueue(q) + for i := 0; i < 20; i++ { + assert.NoError(t, q.Push(i)) + } + + time.Sleep(500 * time.Millisecond) + assert.EqualValues(t, 2, q.GetWorkerNumber()) + assert.EqualValues(t, 2, q.GetWorkerActiveNumber()) + // when the queue never becomes empty, the existing workers should keep working + assert.EqualValues(t, 2, q.workerStartedCounter) + stop() +}