diff --git a/workerpool.go b/workerpool.go index bc3a3c5bb4..9ecd9481df 100644 --- a/workerpool.go +++ b/workerpool.go @@ -6,7 +6,6 @@ import ( "runtime" "strings" "sync" - "sync/atomic" "time" ) @@ -22,57 +21,29 @@ type workerPool struct { // Function for serving server connections. // It must leave c unclosed. - ready workerChanStack WorkerFunc ServeHandler stopCh chan struct{} connState func(net.Conn, ConnState) + ready []*workerChan + MaxWorkersCount int MaxIdleWorkerDuration time.Duration - workersCount int32 + workersCount int - mustStop atomic.Bool + lock sync.Mutex LogAllErrors bool + mustStop bool } type workerChan struct { - next *workerChan - - ch chan net.Conn - - lastUseTime int64 -} - -type workerChanStack struct { - head atomic.Pointer[workerChan] -} - -func (s *workerChanStack) push(ch *workerChan) { - for { - oldHead := s.head.Load() - ch.next = oldHead - if s.head.CompareAndSwap(oldHead, ch) { - break - } - } -} - -func (s *workerChanStack) pop() *workerChan { - for { - oldHead := s.head.Load() - if oldHead == nil { - return nil - } - - if s.head.CompareAndSwap(oldHead, oldHead.next) { - return oldHead - } - } + lastUseTime time.Time + ch chan net.Conn } func (wp *workerPool) Start() { @@ -87,8 +58,9 @@ func (wp *workerPool) Start() { } } go func() { + var scratch []*workerChan for { - wp.clean() + wp.clean(&scratch) select { case <-stopCh: return @@ -109,15 +81,15 @@ func (wp *workerPool) Stop() { // Stop all the workers waiting for incoming connections. // Do not wait for busy workers - they will stop after // serving the connection and noticing wp.mustStop = true. - - for { - ch := wp.ready.pop() - if ch == nil { - break - } - ch.ch <- nil + wp.lock.Lock() + ready := wp.ready + for i := range ready { + ready[i].ch <- nil + ready[i] = nil } - wp.mustStop.Store(true) + wp.ready = ready[:0] + wp.mustStop = true + wp.lock.Unlock() } func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { @@ -127,22 +99,50 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { return wp.MaxIdleWorkerDuration } -func (wp *workerPool) clean() { +func (wp *workerPool) clean(scratch *[]*workerChan) { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() - criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano() - for { - current := wp.ready.head.Load() - if current == nil || atomic.LoadInt64(¤t.lastUseTime) >= criticalTime { - break - } + // Clean least recently used workers if they didn't serve connections + // for more than maxIdleWorkerDuration. + criticalTime := time.Now().Add(-maxIdleWorkerDuration) + + wp.lock.Lock() + ready := wp.ready + n := len(ready) - next := current.next - if wp.ready.head.CompareAndSwap(current, next) { - current.ch <- nil - wp.workerChanPool.Put(current) + // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up. + l, r := 0, n-1 + for l <= r { + mid := (l + r) / 2 + if criticalTime.After(wp.ready[mid].lastUseTime) { + l = mid + 1 + } else { + r = mid - 1 } } + i := r + if i == -1 { + wp.lock.Unlock() + return + } + + *scratch = append((*scratch)[:0], ready[:i+1]...) + m := copy(ready, ready[i+1:]) + for i = m; i < n; i++ { + ready[i] = nil + } + wp.ready = ready[:m] + wp.lock.Unlock() + + // Notify obsolete workers to stop. + // This notification must be outside the wp.lock, since ch.ch + // may be blocking and may consume a lot of time if many workers + // are located on non-local CPUs. + tmp := *scratch + for i := range tmp { + tmp[i].ch <- nil + tmp[i] = nil + } } func (wp *workerPool) Serve(c net.Conn) bool { @@ -169,32 +169,47 @@ var workerChanCap = func() int { }() func (wp *workerPool) getCh() *workerChan { - for { - ch := wp.ready.pop() - if ch != nil { - return ch + var ch *workerChan + createWorker := false + + wp.lock.Lock() + ready := wp.ready + n := len(ready) - 1 + if n < 0 { + if wp.workersCount < wp.MaxWorkersCount { + createWorker = true + wp.workersCount++ } + } else { + ch = ready[n] + ready[n] = nil + wp.ready = ready[:n] + } + wp.lock.Unlock() - currentWorkers := atomic.LoadInt32(&wp.workersCount) - if currentWorkers < int32(wp.MaxWorkersCount) { - if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) { - ch = wp.workerChanPool.Get().(*workerChan) - go wp.workerFunc(ch) - return ch - } - } else { - break + if ch == nil { + if !createWorker { + return nil } + vch := wp.workerChanPool.Get() + ch = vch.(*workerChan) + go func() { + wp.workerFunc(ch) + wp.workerChanPool.Put(vch) + }() } - return nil + return ch } func (wp *workerPool) release(ch *workerChan) bool { - atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano()) - if wp.mustStop.Load() { + ch.lastUseTime = time.Now() + wp.lock.Lock() + if wp.mustStop { + wp.lock.Unlock() return false } - wp.ready.push(ch) + wp.ready = append(wp.ready, ch) + wp.lock.Unlock() return true } @@ -230,5 +245,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) { } } - atomic.AddInt32(&wp.workersCount, -1) + wp.lock.Lock() + wp.workersCount-- + wp.lock.Unlock() }