Skip to content

Commit

Permalink
reimplement iotaskpool: based on taskpool and sync.Pool buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
lesismal committed Apr 6, 2024
1 parent 84a8287 commit 103e0ce
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 66 deletions.
83 changes: 19 additions & 64 deletions taskpool/iotaskpool.go
Original file line number Diff line number Diff line change
@@ -1,86 +1,41 @@
package taskpool

import (
"runtime"
"unsafe"

"github.com/lesismal/nbio/logging"
"sync"
)

// IOTaskPool .
// IOTaskPool .
type IOTaskPool struct {
bufSize int
queueSize int
concurrent int
chTask chan func([]byte)
chClose chan struct{}
caller func(f func())
}

func (tp *IOTaskPool) run() {
buf := make([]byte, tp.bufSize)
for {
select {
case f := <-tp.chTask:
tp.caller(func() {
f(buf)
})
case <-tp.chClose:
return
}
}
task *TaskPool
pool sync.Pool
}

// Go .
func (tp *IOTaskPool) Go(f func([]byte)) {
select {
case tp.chTask <- f:
case <-tp.chClose:
}
tp.task.Go(func() {
pbuf := tp.pool.Get().(*[]byte)
f(*pbuf)
tp.pool.Put(pbuf)
})
}

// Stop .
func (tp *IOTaskPool) Stop() {
close(tp.chClose)
tp.task.Stop()
}

// NewIO .
// NewIO creates and returns a IOTaskPool.
func NewIO(concurrent, queueSize, bufSize int, v ...interface{}) *IOTaskPool {
if concurrent <= 0 {
concurrent = runtime.NumCPU() * 2
}
if queueSize <= 0 {
queueSize = 1024
}
if bufSize <= 0 {
bufSize = 1024 * 64
}
task := New(concurrent, queueSize, v...)

tp := &IOTaskPool{
bufSize: bufSize,
queueSize: queueSize,
concurrent: concurrent,
chTask: make(chan func([]byte), queueSize),
chClose: make(chan struct{}),
}
tp.caller = func(f func()) {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("iotaskpool call failed: %v\n%v\n", err, *(*string)(unsafe.Pointer(&buf)))
}
}()
f()
}
if len(v) > 0 {
if caller, ok := v[0].(func(f func())); ok {
tp.caller = caller
}
}
for i := 0; i < concurrent; i++ {
go tp.run()
task: task,
pool: sync.Pool{
New: func() interface{} {
buf := make([]byte, bufSize)
return &buf
},
},
}

return tp
Expand Down
5 changes: 4 additions & 1 deletion taskpool/taskpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ func (tp *TaskPool) fork(f func()) bool {

// Go .
func (tp *TaskPool) Go(f func()) {
// If current goroutine num is less than maxConcurrent,
// creat a new goroutine to exec new task.
if tp.fork(f) {
return
}

// Else push the new task into chan/queue.
atomic.AddInt64(&tp.concurrent, -1)
select {
case tp.chQqueue <- f:
Expand All @@ -62,7 +65,7 @@ func (tp *TaskPool) Stop() {
close(tp.chClose)
}

// New .
// New creates and returns a TaskPool.
func New(maxConcurrent int, chQqueueSize int, v ...interface{}) *TaskPool {
tp := &TaskPool{
maxConcurrent: int64(maxConcurrent - 1),
Expand Down
23 changes: 22 additions & 1 deletion taskpool/taskpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/lesismal/nbio/logging"
)

const testLoopNum = 1024
const testLoopNum = 1024 * 8
const sleepTime = time.Nanosecond * 0

func BenchmarkGo(b *testing.B) {
Expand Down Expand Up @@ -61,3 +61,24 @@ func BenchmarkTaskPool(b *testing.B) {
wg.Wait()
}
}

func BenchmarkIOTaskPool(b *testing.B) {
p := NewIO(32, 1024, 1024)
defer p.Stop()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg := sync.WaitGroup{}
wg.Add(testLoopNum)
for j := 0; j < testLoopNum; j++ {
p.Go(func(buf []byte) {
if sleepTime > 0 {
time.Sleep(sleepTime)
}
wg.Done()
})
}
wg.Wait()
}
}

0 comments on commit 103e0ce

Please sign in to comment.