From a445f0b181f0ffd8b710abcb6320f55ad48f76b9 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Fri, 16 Aug 2024 18:36:03 +0300 Subject: [PATCH 01/13] using atomic instead of mutex and delete scratch slice --- workerpool.go | 154 ++++++++++++++++++++++---------------------------- 1 file changed, 69 insertions(+), 85 deletions(-) diff --git a/workerpool.go b/workerpool.go index 9ecd9481df..b58d0e5d52 100644 --- a/workerpool.go +++ b/workerpool.go @@ -6,6 +6,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" ) @@ -27,23 +28,48 @@ type workerPool struct { connState func(net.Conn, ConnState) - ready []*workerChan + ready workerChanStack MaxWorkersCount int MaxIdleWorkerDuration time.Duration - workersCount int - - lock sync.Mutex + workersCount int32 LogAllErrors bool - mustStop bool + mustStop atomic.Bool } type workerChan struct { - lastUseTime time.Time - ch chan net.Conn + next *workerChan + + ch chan net.Conn + + lastUseTime int64 +} + +type workerChanStack struct { + head, tail *workerChan +} + +func (s *workerChanStack) push(ch *workerChan) { + ch.next = s.head + s.head = ch + if s.tail == nil { + s.tail = ch + } +} + +func (s *workerChanStack) pop() *workerChan { + head := s.head + if head == nil { + return nil + } + s.head = head.next + if s.head == nil { + s.tail = nil + } + return head } func (wp *workerPool) Start() { @@ -58,9 +84,9 @@ func (wp *workerPool) Start() { } } go func() { - var scratch []*workerChan + for { - wp.clean(&scratch) + wp.clean() select { case <-stopCh: return @@ -81,15 +107,16 @@ func (wp *workerPool) Stop() { // Stop all the workers waiting for incoming connections. // Do not wait for busy workers - they will stop after // serving the connection and noticing wp.mustStop = true. - wp.lock.Lock() - ready := wp.ready - for i := range ready { - ready[i].ch <- nil - ready[i] = nil + + for { + ch := wp.ready.pop() + if ch == nil { + break + } + ch.ch <- nil } - wp.ready = ready[:0] - wp.mustStop = true - wp.lock.Unlock() + wp.mustStop.Store(true) + } func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { @@ -99,50 +126,23 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { return wp.MaxIdleWorkerDuration } -func (wp *workerPool) clean(scratch *[]*workerChan) { +func (wp *workerPool) clean() { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() - - // Clean least recently used workers if they didn't serve connections - // for more than maxIdleWorkerDuration. - criticalTime := time.Now().Add(-maxIdleWorkerDuration) - - wp.lock.Lock() - ready := wp.ready - n := len(ready) - - // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up. - l, r := 0, n-1 - for l <= r { - mid := (l + r) / 2 - if criticalTime.After(wp.ready[mid].lastUseTime) { - l = mid + 1 + criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano() + + current := wp.ready.head + for current != nil { + next := current.next + if current.lastUseTime < criticalTime { + current.ch <- nil + wp.workerChanPool.Put(current) } else { - r = mid - 1 + wp.ready.head = current + break } + current = next } - i := r - if i == -1 { - wp.lock.Unlock() - return - } - - *scratch = append((*scratch)[:0], ready[:i+1]...) - m := copy(ready, ready[i+1:]) - for i = m; i < n; i++ { - ready[i] = nil - } - wp.ready = ready[:m] - wp.lock.Unlock() - - // Notify obsolete workers to stop. - // This notification must be outside the wp.lock, since ch.ch - // may be blocking and may consume a lot of time if many workers - // are located on non-local CPUs. - tmp := *scratch - for i := range tmp { - tmp[i].ch <- nil - tmp[i] = nil - } + wp.ready.tail = wp.ready.head } func (wp *workerPool) Serve(c net.Conn) bool { @@ -170,27 +170,15 @@ var workerChanCap = func() int { func (wp *workerPool) getCh() *workerChan { var ch *workerChan - createWorker := false - - wp.lock.Lock() - ready := wp.ready - n := len(ready) - 1 - if n < 0 { - if wp.workersCount < wp.MaxWorkersCount { - createWorker = true - wp.workersCount++ - } - } else { - ch = ready[n] - ready[n] = nil - wp.ready = ready[:n] + var createWorker bool + + ch = wp.ready.pop() + if ch == nil && atomic.LoadInt32(&wp.workersCount) < int32(wp.MaxWorkersCount) { + atomic.AddInt32(&wp.workersCount, 1) + createWorker = true } - wp.lock.Unlock() - if ch == nil { - if !createWorker { - return nil - } + if ch == nil && createWorker { vch := wp.workerChanPool.Get() ch = vch.(*workerChan) go func() { @@ -202,14 +190,12 @@ func (wp *workerPool) getCh() *workerChan { } func (wp *workerPool) release(ch *workerChan) bool { - ch.lastUseTime = time.Now() - wp.lock.Lock() - if wp.mustStop { - wp.lock.Unlock() + ch.lastUseTime = time.Now().UnixNano() + if wp.mustStop.Load() { return false } - wp.ready = append(wp.ready, ch) - wp.lock.Unlock() + + wp.ready.push(ch) return true } @@ -245,7 +231,5 @@ func (wp *workerPool) workerFunc(ch *workerChan) { } } - wp.lock.Lock() - wp.workersCount-- - wp.lock.Unlock() + atomic.AddInt32(&wp.workersCount, -1) } From a2774f1f3ec16a645b803719302b54c2c930f7eb Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Fri, 16 Aug 2024 19:59:15 +0300 Subject: [PATCH 02/13] optimize struct --- workerpool.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/workerpool.go b/workerpool.go index b58d0e5d52..f5c3674baa 100644 --- a/workerpool.go +++ b/workerpool.go @@ -22,22 +22,21 @@ type workerPool struct { // Function for serving server connections. // It must leave c unclosed. + ready workerChanStack WorkerFunc ServeHandler stopCh chan struct{} connState func(net.Conn, ConnState) - ready workerChanStack - MaxWorkersCount int MaxIdleWorkerDuration time.Duration workersCount int32 - LogAllErrors bool mustStop atomic.Bool + LogAllErrors bool } type workerChan struct { From f57605c30b6b72c69bc03c8f0bfa067bdcd02a46 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Fri, 16 Aug 2024 22:36:04 +0300 Subject: [PATCH 03/13] fix default bool --- workerpool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workerpool.go b/workerpool.go index f5c3674baa..6be88f405d 100644 --- a/workerpool.go +++ b/workerpool.go @@ -169,15 +169,15 @@ var workerChanCap = func() int { func (wp *workerPool) getCh() *workerChan { var ch *workerChan - var createWorker bool + var createWorker atomic.Bool ch = wp.ready.pop() if ch == nil && atomic.LoadInt32(&wp.workersCount) < int32(wp.MaxWorkersCount) { atomic.AddInt32(&wp.workersCount, 1) - createWorker = true + createWorker.Store(true) } - if ch == nil && createWorker { + if ch == nil && createWorker.Load() { vch := wp.workerChanPool.Get() ch = vch.(*workerChan) go func() { From 029d7a58ed1e2c74e8952660f686f16509c527c8 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Fri, 16 Aug 2024 23:16:24 +0300 Subject: [PATCH 04/13] escape data race --- workerpool.go | 59 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/workerpool.go b/workerpool.go index 6be88f405d..7609d0a35a 100644 --- a/workerpool.go +++ b/workerpool.go @@ -35,7 +35,8 @@ type workerPool struct { workersCount int32 - mustStop atomic.Bool + mustStop atomic.Bool + LogAllErrors bool } @@ -48,27 +49,36 @@ type workerChan struct { } type workerChanStack struct { - head, tail *workerChan + head, tail atomic.Pointer[workerChan] } func (s *workerChanStack) push(ch *workerChan) { - ch.next = s.head - s.head = ch - if s.tail == nil { - s.tail = ch + for { + oldHead := s.head.Load() + ch.next = oldHead + if s.head.CompareAndSwap(oldHead, ch) { + break + } } -} -func (s *workerChanStack) pop() *workerChan { - head := s.head - if head == nil { - return nil + if s.tail.Load() == nil { + s.tail.Store(ch) } - s.head = head.next - if s.head == nil { - s.tail = nil +} +func (s *workerChanStack) pop() *workerChan { + for { + oldHead := s.head.Load() + if oldHead == nil { + return nil + } + + if s.head.CompareAndSwap(oldHead, oldHead.next) { + if s.head.Load() == nil { + s.tail.Store(nil) + } + return oldHead + } } - return head } func (wp *workerPool) Start() { @@ -129,19 +139,22 @@ func (wp *workerPool) clean() { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano() - current := wp.ready.head - for current != nil { + for { + current := wp.ready.head.Load() + if current == nil || current.lastUseTime >= criticalTime { + break + } + next := current.next - if current.lastUseTime < criticalTime { + if wp.ready.head.CompareAndSwap(current, next) { current.ch <- nil wp.workerChanPool.Put(current) - } else { - wp.ready.head = current - break } - current = next } - wp.ready.tail = wp.ready.head + + if wp.ready.head.Load() == nil { + wp.ready.tail.Store(nil) + } } func (wp *workerPool) Serve(c net.Conn) bool { From 63f18b917f83bad8138f61b9301741f2ec3aeee3 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Sat, 17 Aug 2024 09:56:22 +0300 Subject: [PATCH 05/13] avoid the momentary change of wp.workersCount. --- workerpool.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/workerpool.go b/workerpool.go index 7609d0a35a..ac1b74575a 100644 --- a/workerpool.go +++ b/workerpool.go @@ -185,9 +185,17 @@ func (wp *workerPool) getCh() *workerChan { var createWorker atomic.Bool ch = wp.ready.pop() - if ch == nil && atomic.LoadInt32(&wp.workersCount) < int32(wp.MaxWorkersCount) { - atomic.AddInt32(&wp.workersCount, 1) - createWorker.Store(true) + if ch == nil { + for { + currentworkers := atomic.LoadInt32(&wp.workersCount) + if currentworkers >= int32(wp.MaxWorkersCount) { + break + } + if atomic.CompareAndSwapInt32(&wp.workersCount, currentworkers, currentworkers+1) { + createWorker.Store(true) + break + } + } } if ch == nil && createWorker.Load() { From 85cbb8d9912025bf0b8951cd4bcfda01519a057b Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Wed, 21 Aug 2024 10:44:38 +0300 Subject: [PATCH 06/13] removed unused tail (for now) --- workerpool.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/workerpool.go b/workerpool.go index ac1b74575a..1906bf7086 100644 --- a/workerpool.go +++ b/workerpool.go @@ -49,7 +49,7 @@ type workerChan struct { } type workerChanStack struct { - head, tail atomic.Pointer[workerChan] + head atomic.Pointer[workerChan] } func (s *workerChanStack) push(ch *workerChan) { @@ -61,9 +61,6 @@ func (s *workerChanStack) push(ch *workerChan) { } } - if s.tail.Load() == nil { - s.tail.Store(ch) - } } func (s *workerChanStack) pop() *workerChan { for { @@ -73,9 +70,7 @@ func (s *workerChanStack) pop() *workerChan { } if s.head.CompareAndSwap(oldHead, oldHead.next) { - if s.head.Load() == nil { - s.tail.Store(nil) - } + return oldHead } } @@ -152,9 +147,6 @@ func (wp *workerPool) clean() { } } - if wp.ready.head.Load() == nil { - wp.ready.tail.Store(nil) - } } func (wp *workerPool) Serve(c net.Conn) bool { From 3403412ce76226227e38c71623a95421b7bcf736 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Wed, 21 Aug 2024 11:43:51 +0300 Subject: [PATCH 07/13] little fixes --- workerpool.go | 44 ++++++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/workerpool.go b/workerpool.go index 1906bf7086..0731ac6e81 100644 --- a/workerpool.go +++ b/workerpool.go @@ -60,8 +60,8 @@ func (s *workerChanStack) push(ch *workerChan) { break } } - } + func (s *workerChanStack) pop() *workerChan { for { oldHead := s.head.Load() @@ -70,12 +70,10 @@ func (s *workerChanStack) pop() *workerChan { } if s.head.CompareAndSwap(oldHead, oldHead.next) { - return oldHead } } } - func (wp *workerPool) Start() { if wp.stopCh != nil { return @@ -146,7 +144,6 @@ func (wp *workerPool) clean() { wp.workerChanPool.Put(current) } } - } func (wp *workerPool) Serve(c net.Conn) bool { @@ -173,32 +170,27 @@ var workerChanCap = func() int { }() func (wp *workerPool) getCh() *workerChan { - var ch *workerChan - var createWorker atomic.Bool + ch := wp.ready.pop() + if ch != nil { + return ch + } - ch = wp.ready.pop() - if ch == nil { - for { - currentworkers := atomic.LoadInt32(&wp.workersCount) - if currentworkers >= int32(wp.MaxWorkersCount) { - break - } - if atomic.CompareAndSwapInt32(&wp.workersCount, currentworkers, currentworkers+1) { - createWorker.Store(true) - break + for { + currentWorkers := atomic.LoadInt32(&wp.workersCount) + if currentWorkers < int32(wp.MaxWorkersCount) { + if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) { + ch = wp.workerChanPool.Get().(*workerChan) + go func() { + wp.workerFunc(ch) + wp.workerChanPool.Put(ch) + }() + return ch } + } else { + break } } - - if ch == nil && createWorker.Load() { - vch := wp.workerChanPool.Get() - ch = vch.(*workerChan) - go func() { - wp.workerFunc(ch) - wp.workerChanPool.Put(vch) - }() - } - return ch + return nil } func (wp *workerPool) release(ch *workerChan) bool { From 7ac1336abf4fac63e35c06330fa69e3e0746ef5e Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:41:03 +0300 Subject: [PATCH 08/13] fixed allocations This option immediately exits the loop when the maximum number of vorkers is reached, rather than creating a new vorker if the limit is reached. This reduces the frequency of unnecessary operations and potential locks in sync.Pool --- workerpool.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/workerpool.go b/workerpool.go index 0731ac6e81..3820edc2db 100644 --- a/workerpool.go +++ b/workerpool.go @@ -170,20 +170,17 @@ var workerChanCap = func() int { }() func (wp *workerPool) getCh() *workerChan { - ch := wp.ready.pop() - if ch != nil { - return ch - } - for { + ch := wp.ready.pop() + if ch != nil { + return ch + } + currentWorkers := atomic.LoadInt32(&wp.workersCount) if currentWorkers < int32(wp.MaxWorkersCount) { if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) { ch = wp.workerChanPool.Get().(*workerChan) - go func() { - wp.workerFunc(ch) - wp.workerChanPool.Put(ch) - }() + go wp.workerFunc(ch) return ch } } else { From 8f2aa8e4c1d06070e45b7b10137357cc230d6367 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Sat, 24 Aug 2024 10:26:02 +0300 Subject: [PATCH 09/13] Update for linter workerpool.go Co-authored-by: Erik Dubbelboer --- workerpool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workerpool.go b/workerpool.go index 3820edc2db..6e1239aaf7 100644 --- a/workerpool.go +++ b/workerpool.go @@ -74,6 +74,7 @@ func (s *workerChanStack) pop() *workerChan { } } } + func (wp *workerPool) Start() { if wp.stopCh != nil { return From 8fa3f4ead94d815631275ed5f238a49d4f02dd7e Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Sat, 24 Aug 2024 10:26:33 +0300 Subject: [PATCH 10/13] Update for lint#2 workerpool.go Co-authored-by: Erik Dubbelboer --- workerpool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workerpool.go b/workerpool.go index 6e1239aaf7..9016e52dd7 100644 --- a/workerpool.go +++ b/workerpool.go @@ -87,7 +87,6 @@ func (wp *workerPool) Start() { } } go func() { - for { wp.clean() select { From 1b0309997b14b10e2c63f6f15026a7ab934ab0a0 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Sat, 24 Aug 2024 10:26:44 +0300 Subject: [PATCH 11/13] Update for lint#3 workerpool.go Co-authored-by: Erik Dubbelboer --- workerpool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workerpool.go b/workerpool.go index 9016e52dd7..b4002d1374 100644 --- a/workerpool.go +++ b/workerpool.go @@ -118,7 +118,6 @@ func (wp *workerPool) Stop() { ch.ch <- nil } wp.mustStop.Store(true) - } func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { From bfecd4ce3714abc16432feb49a8dba72852462c5 Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Sat, 24 Aug 2024 10:31:08 +0300 Subject: [PATCH 12/13] Update for lint#4 workerpool.go --- workerpool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workerpool.go b/workerpool.go index b4002d1374..76d61d96d0 100644 --- a/workerpool.go +++ b/workerpool.go @@ -194,7 +194,6 @@ func (wp *workerPool) release(ch *workerChan) bool { if wp.mustStop.Load() { return false } - wp.ready.push(ch) return true } From 9b9be0bf647c36a0b0e0038cd090e32730e3b6dc Mon Sep 17 00:00:00 2001 From: NikoMalik <123812634+NikoMalik@users.noreply.github.com> Date: Sat, 24 Aug 2024 11:40:12 +0300 Subject: [PATCH 13/13] eliminating potential data races --- workerpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workerpool.go b/workerpool.go index 76d61d96d0..bc3a3c5bb4 100644 --- a/workerpool.go +++ b/workerpool.go @@ -133,7 +133,7 @@ func (wp *workerPool) clean() { for { current := wp.ready.head.Load() - if current == nil || current.lastUseTime >= criticalTime { + if current == nil || atomic.LoadInt64(¤t.lastUseTime) >= criticalTime { break } @@ -190,7 +190,7 @@ func (wp *workerPool) getCh() *workerChan { } func (wp *workerPool) release(ch *workerChan) bool { - ch.lastUseTime = time.Now().UnixNano() + atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano()) if wp.mustStop.Load() { return false }