Skip to content

Commit

Permalink
Using atomic instead of mutex and delete scratch slice (#1833)
Browse files Browse the repository at this point in the history
* using atomic instead of mutex and delete scratch slice

* optimize struct

* fix default bool

* escape data race

* avoid the momentary change of wp.workersCount.

* removed unused tail (for now)

* little fixes

* fixed allocations

This option immediately exits the loop when the maximum number of vorkers is reached, rather than creating a new vorker if the limit is reached. This reduces the frequency of unnecessary operations and potential locks in sync.Pool

* Update for linter workerpool.go

Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>

* Update for lint#2 workerpool.go

Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>

* Update for lint#3 workerpool.go

Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>

* Update for lint#4 workerpool.go

* eliminating potential data races

---------

Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
  • Loading branch information
NikoMalik and erikdubbelboer authored Aug 25, 2024
1 parent 25c52d7 commit 19c50cd
Showing 1 changed file with 74 additions and 91 deletions.
165 changes: 74 additions & 91 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -21,29 +22,57 @@ type workerPool struct {

// Function for serving server connections.
// It must leave c unclosed.
ready workerChanStack
WorkerFunc ServeHandler

stopCh chan struct{}

connState func(net.Conn, ConnState)

ready []*workerChan

MaxWorkersCount int

MaxIdleWorkerDuration time.Duration

workersCount int
workersCount int32

lock sync.Mutex
mustStop atomic.Bool

LogAllErrors bool
mustStop bool
}

type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
next *workerChan

ch chan net.Conn

lastUseTime int64
}

type workerChanStack struct {
head atomic.Pointer[workerChan]
}

func (s *workerChanStack) push(ch *workerChan) {
for {
oldHead := s.head.Load()
ch.next = oldHead
if s.head.CompareAndSwap(oldHead, ch) {
break
}
}
}

func (s *workerChanStack) pop() *workerChan {
for {
oldHead := s.head.Load()
if oldHead == nil {
return nil
}

if s.head.CompareAndSwap(oldHead, oldHead.next) {
return oldHead
}
}
}

func (wp *workerPool) Start() {
Expand All @@ -58,9 +87,8 @@ func (wp *workerPool) Start() {
}
}
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
wp.clean()
select {
case <-stopCh:
return
Expand All @@ -81,15 +109,15 @@ func (wp *workerPool) Stop() {
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
for i := range ready {
ready[i].ch <- nil
ready[i] = nil

for {
ch := wp.ready.pop()
if ch == nil {
break
}
ch.ch <- nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
wp.mustStop.Store(true)
}

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
Expand All @@ -99,49 +127,21 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
func (wp *workerPool) clean() {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()

// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()
ready := wp.ready
n := len(ready)

// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r := 0, n-1
for l <= r {
mid := (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
for {
current := wp.ready.head.Load()
if current == nil || atomic.LoadInt64(&current.lastUseTime) >= criticalTime {
break
}
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}

*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
next := current.next
if wp.ready.head.CompareAndSwap(current, next) {
current.ch <- nil
wp.workerChanPool.Put(current)
}
}
}

Expand Down Expand Up @@ -169,47 +169,32 @@ var workerChanCap = func() int {
}()

func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false

wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
for {
ch := wp.ready.pop()
if ch != nil {
return ch
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()

if ch == nil {
if !createWorker {
return nil
currentWorkers := atomic.LoadInt32(&wp.workersCount)
if currentWorkers < int32(wp.MaxWorkersCount) {
if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) {
ch = wp.workerChanPool.Get().(*workerChan)
go wp.workerFunc(ch)
return ch
}
} else {
break
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
return nil
}

func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano())
if wp.mustStop.Load() {
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
wp.ready.push(ch)
return true
}

Expand Down Expand Up @@ -245,7 +230,5 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
}
}

wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
atomic.AddInt32(&wp.workersCount, -1)
}

0 comments on commit 19c50cd

Please sign in to comment.