Skip to content

Commit

Permalink
util/goroutine_pool: add a goroutine pool package utilities (#3752)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Sep 12, 2017
1 parent 262260a commit 3245d49
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 0 deletions.
129 changes: 129 additions & 0 deletions util/goroutine_pool/gp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package gp

import (
"sync"
"sync/atomic"
"time"
)

// Pool is a struct to represent goroutine pool.
type Pool struct {
head goroutine
tail *goroutine
count int
idleTimeout time.Duration
sync.Mutex
}

// goroutine is actually a background goroutine, with a channel binded for communication.
type goroutine struct {
ch chan func()
next *goroutine
status int32
}

const (
statusIdle int32 = 0
statusInUse int32 = 1
statusDead int32 = 2
)

// New returns a new *Pool object.
func New(idleTimeout time.Duration) *Pool {
pool := &Pool{
idleTimeout: idleTimeout,
}
pool.tail = &pool.head
return pool
}

// Go works like go func(), but goroutines are pooled for reusing.
// This strategy can avoid runtime.morestack, because pooled goroutine is already enlarged.
func (pool *Pool) Go(f func()) {
for {
g := pool.get()
if atomic.CompareAndSwapInt32(&g.status, statusIdle, statusInUse) {
g.ch <- f
return
}
// Status already changed from statusIdle => statusDead, drop it, find next one.
}
}

func (pool *Pool) get() *goroutine {
pool.Lock()
head := &pool.head
if head.next == nil {
pool.Unlock()
return pool.alloc()
}

ret := head.next
head.next = ret.next
if ret == pool.tail {
pool.tail = head
}
pool.count--
pool.Unlock()
ret.next = nil
return ret
}

func (pool *Pool) alloc() *goroutine {
g := &goroutine{
ch: make(chan func()),
}
go g.workLoop(pool)
return g
}

func (g *goroutine) put(pool *Pool) {
g.status = statusIdle
pool.Lock()
pool.tail.next = g
pool.tail = g
pool.count++
pool.Unlock()
}

func (g *goroutine) workLoop(pool *Pool) {
timer := time.NewTimer(pool.idleTimeout)
for {
select {
case <-timer.C:
// Check to avoid a corner case that the goroutine is take out from pool,
// and get this signal at the same time.
succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDead)
if succ {
return
}
case work := <-g.ch:
work()
// Put g back to the pool.
// This is the normal usage for a resource pool:
//
// obj := pool.get()
// use(obj)
// pool.put(obj)
//
// But when goroutine is used as a resource, we can't pool.put() immediately,
// because the resource(goroutine) maybe still in use.
// So, put back resource is done here, when the goroutine finish its work.
g.put(pool)
}
timer.Reset(pool.idleTimeout)
}
}
132 changes: 132 additions & 0 deletions util/goroutine_pool/gp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package gp

import (
"sync"
"testing"
"time"
)

func TestBasicAPI(t *testing.T) {
gp := New(time.Second)
var wg sync.WaitGroup
wg.Add(1)
// cover alloc()
gp.Go(func() { wg.Done() })
// cover put()
wg.Wait()
// cover get()
gp.Go(func() {})
}

func TestGC(t *testing.T) {
gp := New(200 * time.Millisecond)
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
idx := i
gp.Go(func() {
time.Sleep(time.Duration(idx+1) * time.Millisecond)
wg.Done()
})
}
wg.Wait()
time.Sleep(300 * time.Millisecond)
gp.Go(func() {}) // To trigger count change.
gp.Lock()
count := gp.count
gp.Unlock()
if count > 1 {
t.Error("all goroutines should be recycled", count)
}
}

func TestRace(t *testing.T) {
gp := New(8 * time.Millisecond)
var wg sync.WaitGroup
begin := make(chan struct{})
wg.Add(500)
for i := 0; i < 50; i++ {
go func() {
<-begin
for i := 0; i < 10; i++ {
gp.Go(func() {
wg.Done()
})
time.Sleep(5 * time.Millisecond)
}
}()
}
close(begin)
wg.Wait()
}

func BenchmarkGoPool(b *testing.B) {
gp := New(20 * time.Second)
for i := 0; i < b.N/2; i++ {
gp.Go(func() {})
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
gp.Go(dummy)
}
}

func BenchmarkGo(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
go dummy()
}
}

func dummy() {
}

func BenchmarkMorestackPool(b *testing.B) {
gp := New(5 * time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(1)
gp.Go(func() {
morestack(false)
wg.Done()
})
wg.Wait()
}
}

func BenchmarkMoreStack(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(1)
go func() {
morestack(false)
wg.Done()
}()
wg.Wait()
}
}

func morestack(f bool) {
var stack [8 * 1024]byte
if f {
for i := 0; i < len(stack); i++ {
stack[i] = 'a'
}
}
}

0 comments on commit 3245d49

Please sign in to comment.