Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using atomic instead of mutex and delete scratch slice #1833

Merged
merged 13 commits into from
Aug 25, 2024
167 changes: 76 additions & 91 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -21,31 +22,58 @@

// Function for serving server connections.
// It must leave c unclosed.
ready workerChanStack
WorkerFunc ServeHandler

stopCh chan struct{}

connState func(net.Conn, ConnState)

ready []*workerChan

MaxWorkersCount int

MaxIdleWorkerDuration time.Duration

workersCount int
workersCount int32

lock sync.Mutex
mustStop atomic.Bool

LogAllErrors bool
mustStop bool
}

type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
next *workerChan

ch chan net.Conn

lastUseTime int64
}

type workerChanStack struct {
head atomic.Pointer[workerChan]
}

func (s *workerChanStack) push(ch *workerChan) {
for {
oldHead := s.head.Load()
ch.next = oldHead
if s.head.CompareAndSwap(oldHead, ch) {
break
}
}
}

func (s *workerChanStack) pop() *workerChan {
for {
oldHead := s.head.Load()
if oldHead == nil {
return nil
}

if s.head.CompareAndSwap(oldHead, oldHead.next) {
return oldHead
}
}
}
NikoMalik marked this conversation as resolved.
Show resolved Hide resolved
func (wp *workerPool) Start() {
if wp.stopCh != nil {
return
Expand All @@ -57,10 +85,10 @@
ch: make(chan net.Conn, workerChanCap),
}
}
go func() {

Check failure on line 88 in workerpool.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)
var scratch []*workerChan

NikoMalik marked this conversation as resolved.
Show resolved Hide resolved
for {
wp.clean(&scratch)
wp.clean()
select {
case <-stopCh:
return
Expand All @@ -81,16 +109,17 @@
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
for i := range ready {
ready[i].ch <- nil
ready[i] = nil

for {
ch := wp.ready.pop()
if ch == nil {
break
}
ch.ch <- nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
wp.mustStop.Store(true)

NikoMalik marked this conversation as resolved.
Show resolved Hide resolved
}

Check failure on line 122 in workerpool.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
if wp.MaxIdleWorkerDuration <= 0 {
Expand All @@ -99,49 +128,21 @@
return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
func (wp *workerPool) clean() {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()

// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()
ready := wp.ready
n := len(ready)

// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r := 0, n-1
for l <= r {
mid := (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
for {
current := wp.ready.head.Load()
if current == nil || current.lastUseTime >= criticalTime {
break
}
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}

*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
next := current.next
if wp.ready.head.CompareAndSwap(current, next) {
current.ch <- nil
wp.workerChanPool.Put(current)
}
}
}

Expand Down Expand Up @@ -169,47 +170,33 @@
}()

func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false

wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
for {
ch := wp.ready.pop()
if ch != nil {
return ch
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()

if ch == nil {
if !createWorker {
return nil
currentWorkers := atomic.LoadInt32(&wp.workersCount)
if currentWorkers < int32(wp.MaxWorkersCount) {
if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) {
ch = wp.workerChanPool.Get().(*workerChan)
go wp.workerFunc(ch)
return ch
}
} else {
break
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
return nil
}

func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
ch.lastUseTime = time.Now().UnixNano()
if wp.mustStop.Load() {
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()

wp.ready.push(ch)
return true
}

Expand Down Expand Up @@ -245,7 +232,5 @@
}
}

wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
atomic.AddInt32(&wp.workersCount, -1)
}
Loading