Skip to content

Commit

Permalink
bbp arena (#40)
Browse files Browse the repository at this point in the history
Change-Id: Ie0bd1c8060dd22f1c343f11b0bbc84fba566353a
  • Loading branch information
jxskiss authored Mar 3, 2023
1 parent 54099d9 commit 1146bcf
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 7 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ jobs:
- name: Checkout code
uses: actions/checkout@v3

# Can't run race detector on windows with go 1.18 or lower due to a bug.
# See https://github.com/golang/go/issues/46099.
- name: Test Windows 1.18
if: ${{ matrix.os == 'windows-latest' && matrix.go == '1.18' }}
run: go test -v ./...

- name: Test
if: ${{ matrix.os != 'windows-latest' || matrix.go != '1.18' }}
run: go test -race -v ./...

lint:
Expand Down
2 changes: 2 additions & 0 deletions internal/linkname/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"unsafe"
)

var sysAllocMemStat uint64

// Runtime_memclrNoHeapPointers clears n bytes starting at ptr.
//
// Usually you should use typedmemclr. Runtime_memclrNoHeapPointers should be
Expand Down
11 changes: 11 additions & 0 deletions internal/linkname/runtime_1.19.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package linkname

import (
"reflect"
"sync/atomic"
"unsafe"
)

Expand All @@ -15,7 +16,16 @@ func Runtime_fastrand64() uint64
//
// DON'T use this if you don't know what it does.
func Runtime_sysAlloc(n uintptr) []byte {
atomic.AddUint64(&sysAllocMemStat, uint64(n))
addr := runtime_sysAllocOS(n)
if addr == nil {
// Don't allow the caller to capture this panic,
// and block to wait the program exiting.
go func() {
panic("Runtime_sysAlloc: out of memory")
}()
select {}
}
return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
Data: uintptr(addr),
Len: int(n),
Expand All @@ -29,6 +39,7 @@ func Runtime_sysAlloc(n uintptr) []byte {
func Runtime_sysFree(mem []byte) {
addr := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&mem)).Data)
n := uintptr(cap(mem))
atomic.AddInt64((*int64)(unsafe.Pointer(&sysAllocMemStat)), -int64(n))
runtime_sysFreeOS(addr, n)
}

Expand Down
14 changes: 10 additions & 4 deletions internal/linkname/runtime_below_1.19.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ func Runtime_fastrand64() uint64 {
//
// DON'T use this if you don't know what it does.
func Runtime_sysAlloc(n uintptr) []byte {
var memStat uint64
addr := runtime_sysAlloc(n, &memStat)
addr := runtime_sysAlloc(n, &sysAllocMemStat)
if addr == nil {
// Don't allow the caller to capture this panic,
// and block to wait the program exiting.
go func() {
panic("Runtime_sysAlloc: out of memory")
}()
select {}
}
return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
Data: uintptr(addr),
Len: int(n),
Expand All @@ -31,8 +38,7 @@ func Runtime_sysAlloc(n uintptr) []byte {
func Runtime_sysFree(mem []byte) {
addr := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&mem)).Data)
n := uintptr(cap(mem))
memStat := uint64(n)
runtime_sysFree(addr, n, &memStat)
runtime_sysFree(addr, n, &sysAllocMemStat)
}

//go:linkname runtime_sysAlloc runtime.sysAlloc
Expand Down
145 changes: 145 additions & 0 deletions perf/bbp/arena.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package bbp

import (
"container/list"
"sync"
"syscall"

"github.com/jxskiss/gopkg/v2/internal"
"github.com/jxskiss/gopkg/v2/internal/linkname"
)

var sysPageSize = syscall.Getpagesize()

func alignChunkSize(chunkSize int) int {
if chunkSize < sysPageSize {
chunkSize = sysPageSize
}
return int(internal.NextPowerOfTwo(uint(chunkSize)))
}

var arenaPool = sync.Pool{
New: func() any { return &Arena{} },
}

// Arena allocates memory in chunk mode, and serves requests to allocate
// small byte slices, after working with the memory chunks,
// user should call Free to release the allocated memory together.
// It's efficient for memory allocation-heavy workloads.
type Arena struct {
chunkSize int
allocFunc func(size int) []byte
freeFunc func([]byte)
lst list.List
}

// OffHeapArena is similar to Arena, except that it allocates memory
// directly from operating system instead of Go's runtime.
//
// Note that after working with the memory chunks, user **MUST** call
// Free to return the memory to operating system, else memory leaks.
type OffHeapArena Arena

// NewArena creates an Arena object, it allocates memory from the sized
// buffer pools.
// The method Free returns memory chunks to the pool for reusing,
// after which both the arena and the byte slices allocated from the arena
// **MUST NOT** be touched again.
// chunkSize will be round up to the next power of two that is
// greater than or equal to the system's PAGE_SIZE.
func NewArena(chunkSize int) *Arena {
chunkSize = alignChunkSize(chunkSize)
poolIdx := indexGet(chunkSize)
bp := sizedPools[poolIdx]
a := arenaPool.Get().(*Arena)
a.chunkSize = chunkSize
a.allocFunc = bp.Get
a.freeFunc = bp.Put
return a
}

// NewOffHeapArena creates an OffHeapArena which allocates memory directly
// from operating system (without cgo).
// The method Free frees allocated memory chunks.
// Free must be called after working with the arena to avoid memory leaks.
// After Free being called, both the arena and the byte slices allocated
// from the arena **MUST NOT** be touched again.
// chunkSize will be round up to the next power of two that is
// greater than or equal to the system's PAGE_SIZE.
func NewOffHeapArena(chunkSize int) *OffHeapArena {
chunkSize = alignChunkSize(chunkSize)
a := arenaPool.Get().(*Arena)
a.chunkSize = chunkSize
a.allocFunc = offHeapAlloc
a.freeFunc = offHeapFree
return (*OffHeapArena)(a)
}

func offHeapAlloc(chunkSize int) []byte {
return linkname.Runtime_sysAlloc(uintptr(chunkSize))
}

func offHeapFree(buf []byte) {
linkname.Runtime_sysFree(buf)
}

// Alloc allocates small byte slice from the arena.
func (a *Arena) Alloc(length, capacity int) []byte {
if capacity > a.chunkSize>>2 {
return make([]byte, length, capacity)
}

if active := a.lst.Back(); active != nil {
chunk := active.Value.(*memChunk)
if buf, ok := chunk.alloc(length, capacity); ok {
return buf
}
}

chunk := a.allocNewChunk()
buf, _ := chunk.alloc(length, capacity)
return buf
}

// Free releases all memory chunks managed by the arena.
// It returns the memory chunks to pool for reusing.
func (a *Arena) Free() {
for node := a.lst.Front(); node != nil; node = node.Next() {
chunk := node.Value.(*memChunk)
a.freeFunc(chunk.buf)
}
a.lst.Init() // clear the list
arenaPool.Put(a)
}

func (a *Arena) allocNewChunk() *memChunk {
buf := a.allocFunc(a.chunkSize)
chunk := &memChunk{buf: buf}
a.lst.PushBack(chunk)
return chunk
}

type memChunk struct {
buf []byte
i int
}

func (c *memChunk) alloc(length, capacity int) ([]byte, bool) {
j := c.i + capacity
if j < cap(c.buf) {
c.i = j
buf := c.buf[j-capacity : j]
return buf[0:length:capacity], true
}
return nil, false
}

// Alloc allocates small byte slice from the arena.
func (a *OffHeapArena) Alloc(length, capacity int) []byte {
return (*Arena)(a).Alloc(length, capacity)
}

// Free returns all memory chunks managed by the arena to the operating system.
func (a *OffHeapArena) Free() {
(*Arena)(a).Free()
}
54 changes: 54 additions & 0 deletions perf/bbp/arena_cgo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build cgo

package bbp

/*
#include <stdlib.h>
*/
import "C"

import (
"unsafe"

"github.com/jxskiss/gopkg/v2/internal/unsafeheader"
)

// NewCgoArena creates an OffHeapArena which allocates memory by calling
// cgo `C.malloc`. cgo must be enabled to use this.
// The method Free frees allocated memory chunks.
// Free must be called after working with the arena to avoid memory leaks.
// After Free being called, both the arena and the byte slices allocated
// from the arena **MUST NOT** be touched again.
// chunkSize will be round up to the next power of two that is
// greater than or equal to the system's PAGE_SIZE.
func NewCgoArena(chunkSize int) *OffHeapArena {
chunkSize = alignChunkSize(chunkSize)
a := arenaPool.Get().(*Arena)
a.chunkSize = chunkSize
a.allocFunc = cgoAlloc
a.freeFunc = cgoFree
return (*OffHeapArena)(a)
}

func cgoAlloc(size int) []byte {
ptr := C.malloc(C.size_t(size))
if ptr == nil {
// Don't allow the caller to capture this panic,
// and block to wait the program exiting.
go func() {
panic("bbp.Arena: out of memory")
}()
select {}
}
buf := *(*[]byte)(unsafe.Pointer(&unsafeheader.Slice{
Data: ptr,
Len: size,
Cap: size,
}))
return buf
}

func cgoFree(buf []byte) {
ptr := (*unsafeheader.Slice)(unsafe.Pointer(&buf)).Data
C.free(ptr)
}
77 changes: 77 additions & 0 deletions perf/bbp/arena_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package bbp

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestArena(t *testing.T) {
type arenaIface interface {
Alloc(length, capacity int) []byte
Free()
}

makeArenas := func() []arenaIface {
return []arenaIface{
NewArena(456),
NewOffHeapArena(567),
NewCgoArena(789),
}
}

getChunkSize := func(a arenaIface) int {
switch a := a.(type) {
case *Arena:
return a.chunkSize
case *OffHeapArena:
return a.chunkSize
}
panic("unreachable")
}

t.Logf("sysPageSize= %v", sysPageSize)
for _, a := range makeArenas() {
assert.Equal(t, sysPageSize, getChunkSize(a))

n := sysPageSize / 2
buf := a.Alloc(10, n)
assert.Equal(t, 10, len(buf))
assert.Equal(t, n, cap(buf))

for {
if n > 2*sysPageSize+1 {
break
}
buf := a.Alloc(10, 100)
n += cap(buf)
assert.Equal(t, 10, len(buf))
assert.Equal(t, 100, cap(buf))
}
a.Free()
}
}

func BenchmarkNewArena(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
a := NewArena(sysPageSize)
a.Free()
}
}

func BenchmarkNewOffHeapArena(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
a := NewOffHeapArena(sysPageSize)
a.Free()
}
}

func BenchmarkNewCgoArena(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
a := NewCgoArena(sysPageSize)
a.Free()
}
}
6 changes: 6 additions & 0 deletions perf/bbp/sized.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func (p *bufPool) Get(length int) []byte {
return make([]byte, length, p.size)
}

func (p *bufPool) Put(buf []byte) {
if cap(buf) >= p.size {
p.pool.Put(_toPtr(buf))
}
}

func _toBuf(ptr unsafe.Pointer, length int) []byte {
size := *(*int)(ptr)
return *(*[]byte)(unsafe.Pointer(&unsafeheader.Slice{
Expand Down
5 changes: 2 additions & 3 deletions perf/lru/walbuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ func TestFastHashset(t *testing.T) {
values[i] = uint32(rand.Int31n(1000))
}

var setBuf [walSetSize]uint32
fastSet := fastHashset(setBuf)
var fastSet fastHashset
for _, x := range values {
fastSet.add(x)
}

fmt.Println(values)
fmt.Println(setBuf)
fmt.Println(fastSet)

mapSet := make(map[uint32]bool)
for _, x := range values {
Expand Down

0 comments on commit 1146bcf

Please sign in to comment.