-
Notifications
You must be signed in to change notification settings - Fork 664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement utils.BytesPool
to replace sync.Pool
for byte slices
#2920
Changes from 5 commits
46616a6
54896fa
a984fd3
f84f752
5d93c3a
4b2120c
5a68502
28a849b
f5446e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,13 +9,10 @@ import ( | |
"sync" | ||
|
||
"github.com/ava-labs/avalanchego/database" | ||
"github.com/ava-labs/avalanchego/utils" | ||
"github.com/ava-labs/avalanchego/utils/hashing" | ||
) | ||
|
||
const ( | ||
defaultBufCap = 256 | ||
) | ||
|
||
var ( | ||
_ database.Database = (*Database)(nil) | ||
_ database.Batch = (*batch)(nil) | ||
|
@@ -26,9 +23,8 @@ var ( | |
// a unique value. | ||
type Database struct { | ||
// All keys in this db begin with this byte slice | ||
dbPrefix []byte | ||
// Holds unused []byte | ||
bufferPool sync.Pool | ||
dbPrefix []byte | ||
bufferPool *utils.BytesPool | ||
|
||
// lock needs to be held during Close to guarantee db will not be set to nil | ||
// concurrently with another operation. All other operations can hold RLock. | ||
|
@@ -40,13 +36,9 @@ type Database struct { | |
|
||
func newDB(prefix []byte, db database.Database) *Database { | ||
return &Database{ | ||
dbPrefix: prefix, | ||
db: db, | ||
bufferPool: sync.Pool{ | ||
New: func() interface{} { | ||
return make([]byte, 0, defaultBufCap) | ||
}, | ||
}, | ||
dbPrefix: prefix, | ||
db: db, | ||
bufferPool: utils.NewBytesPool(), | ||
} | ||
} | ||
|
||
|
@@ -91,9 +83,6 @@ func PrefixKey(prefix, key []byte) []byte { | |
return prefixedKey | ||
} | ||
|
||
// Assumes that it is OK for the argument to db.db.Has | ||
// to be modified after db.db.Has returns | ||
// [key] may be modified after this method returns. | ||
func (db *Database) Has(key []byte) (bool, error) { | ||
db.lock.RLock() | ||
defer db.lock.RUnlock() | ||
|
@@ -102,14 +91,11 @@ func (db *Database) Has(key []byte) (bool, error) { | |
return false, database.ErrClosed | ||
} | ||
prefixedKey := db.prefix(key) | ||
has, err := db.db.Has(prefixedKey) | ||
db.bufferPool.Put(prefixedKey) | ||
return has, err | ||
defer db.bufferPool.Put(prefixedKey) | ||
|
||
return db.db.Has(*prefixedKey) | ||
} | ||
|
||
// Assumes that it is OK for the argument to db.db.Get | ||
// to be modified after db.db.Get returns. | ||
// [key] may be modified after this method returns. | ||
func (db *Database) Get(key []byte) ([]byte, error) { | ||
db.lock.RLock() | ||
defer db.lock.RUnlock() | ||
|
@@ -118,15 +104,11 @@ func (db *Database) Get(key []byte) ([]byte, error) { | |
return nil, database.ErrClosed | ||
} | ||
prefixedKey := db.prefix(key) | ||
val, err := db.db.Get(prefixedKey) | ||
db.bufferPool.Put(prefixedKey) | ||
return val, err | ||
defer db.bufferPool.Put(prefixedKey) | ||
|
||
return db.db.Get(*prefixedKey) | ||
} | ||
|
||
// Assumes that it is OK for the argument to db.db.Put | ||
// to be modified after db.db.Put returns. | ||
// [key] can be modified after this method returns. | ||
// [value] should not be modified. | ||
func (db *Database) Put(key, value []byte) error { | ||
db.lock.RLock() | ||
defer db.lock.RUnlock() | ||
|
@@ -135,14 +117,11 @@ func (db *Database) Put(key, value []byte) error { | |
return database.ErrClosed | ||
} | ||
prefixedKey := db.prefix(key) | ||
err := db.db.Put(prefixedKey, value) | ||
db.bufferPool.Put(prefixedKey) | ||
return err | ||
defer db.bufferPool.Put(prefixedKey) | ||
|
||
return db.db.Put(*prefixedKey, value) | ||
} | ||
|
||
// Assumes that it is OK for the argument to db.db.Delete | ||
// to be modified after db.db.Delete returns. | ||
// [key] may be modified after this method returns. | ||
func (db *Database) Delete(key []byte) error { | ||
db.lock.RLock() | ||
defer db.lock.RUnlock() | ||
|
@@ -151,9 +130,9 @@ func (db *Database) Delete(key []byte) error { | |
return database.ErrClosed | ||
} | ||
prefixedKey := db.prefix(key) | ||
err := db.db.Delete(prefixedKey) | ||
db.bufferPool.Put(prefixedKey) | ||
return err | ||
defer db.bufferPool.Put(prefixedKey) | ||
|
||
return db.db.Delete(*prefixedKey) | ||
} | ||
|
||
func (db *Database) NewBatch() database.Batch { | ||
|
@@ -186,15 +165,17 @@ func (db *Database) NewIteratorWithStartAndPrefix(start, prefix []byte) database | |
Err: database.ErrClosed, | ||
} | ||
} | ||
|
||
prefixedStart := db.prefix(start) | ||
defer db.bufferPool.Put(prefixedStart) | ||
|
||
prefixedPrefix := db.prefix(prefix) | ||
it := &iterator{ | ||
Iterator: db.db.NewIteratorWithStartAndPrefix(prefixedStart, prefixedPrefix), | ||
defer db.bufferPool.Put(prefixedPrefix) | ||
|
||
return &iterator{ | ||
Iterator: db.db.NewIteratorWithStartAndPrefix(*prefixedStart, *prefixedPrefix), | ||
db: db, | ||
} | ||
db.bufferPool.Put(prefixedStart) | ||
db.bufferPool.Put(prefixedPrefix) | ||
return it | ||
} | ||
|
||
func (db *Database) Compact(start, limit []byte) error { | ||
|
@@ -204,7 +185,14 @@ func (db *Database) Compact(start, limit []byte) error { | |
if db.closed { | ||
return database.ErrClosed | ||
} | ||
return db.db.Compact(db.prefix(start), db.prefix(limit)) | ||
|
||
prefixedStart := db.prefix(start) | ||
defer db.bufferPool.Put(prefixedStart) | ||
|
||
prefixedLimit := db.prefix(limit) | ||
defer db.bufferPool.Put(prefixedLimit) | ||
|
||
return db.db.Compact(*prefixedStart, *prefixedLimit) | ||
} | ||
|
||
func (db *Database) Close() error { | ||
|
@@ -236,23 +224,12 @@ func (db *Database) HealthCheck(ctx context.Context) (interface{}, error) { | |
} | ||
|
||
// Return a copy of [key], prepended with this db's prefix. | ||
// The returned slice should be put back in the pool | ||
// when it's done being used. | ||
func (db *Database) prefix(key []byte) []byte { | ||
// Get a []byte from the pool | ||
prefixedKey := db.bufferPool.Get().([]byte) | ||
// The returned slice should be put back in the pool when it's done being used. | ||
func (db *Database) prefix(key []byte) *[]byte { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wonder if we could get rid of the pointer here. Not sure if there is a way to return a byte slice and give back the slice internal memory to the pool one we are done using it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using |
||
keyLen := len(db.dbPrefix) + len(key) | ||
if cap(prefixedKey) >= keyLen { | ||
// The [] byte we got from the pool is big enough to hold the prefixed key | ||
prefixedKey = prefixedKey[:keyLen] | ||
} else { | ||
// The []byte from the pool wasn't big enough. | ||
// Put it back and allocate a new, bigger one | ||
db.bufferPool.Put(prefixedKey) | ||
prefixedKey = make([]byte, keyLen) | ||
} | ||
copy(prefixedKey, db.dbPrefix) | ||
copy(prefixedKey[len(db.dbPrefix):], key) | ||
prefixedKey := db.bufferPool.Get(keyLen) | ||
copy(*prefixedKey, db.dbPrefix) | ||
copy((*prefixedKey)[len(db.dbPrefix):], key) | ||
return prefixedKey | ||
} | ||
|
||
|
@@ -264,33 +241,32 @@ type batch struct { | |
// Each key is prepended with the database's prefix. | ||
// Each byte slice underlying a key should be returned to the pool | ||
// when this batch is reset. | ||
ops []database.BatchOp | ||
ops []batchOp | ||
} | ||
|
||
type batchOp struct { | ||
Key *[]byte | ||
Value []byte | ||
Delete bool | ||
} | ||
|
||
// Assumes that it is OK for the argument to b.Batch.Put | ||
// to be modified after b.Batch.Put returns | ||
// [key] may be modified after this method returns. | ||
// [value] may be modified after this method returns. | ||
func (b *batch) Put(key, value []byte) error { | ||
prefixedKey := b.db.prefix(key) | ||
copiedValue := slices.Clone(value) | ||
b.ops = append(b.ops, database.BatchOp{ | ||
b.ops = append(b.ops, batchOp{ | ||
Key: prefixedKey, | ||
Value: copiedValue, | ||
}) | ||
return b.Batch.Put(prefixedKey, copiedValue) | ||
return b.Batch.Put(*prefixedKey, copiedValue) | ||
} | ||
|
||
// Assumes that it is OK for the argument to b.Batch.Delete | ||
// to be modified after b.Batch.Delete returns | ||
// [key] may be modified after this method returns. | ||
func (b *batch) Delete(key []byte) error { | ||
prefixedKey := b.db.prefix(key) | ||
b.ops = append(b.ops, database.BatchOp{ | ||
b.ops = append(b.ops, batchOp{ | ||
Key: prefixedKey, | ||
Delete: true, | ||
}) | ||
return b.Batch.Delete(prefixedKey) | ||
return b.Batch.Delete(*prefixedKey) | ||
} | ||
|
||
// Write flushes any accumulated data to the memory database. | ||
|
@@ -316,19 +292,17 @@ func (b *batch) Reset() { | |
|
||
// Clear b.writes | ||
if cap(b.ops) > len(b.ops)*database.MaxExcessCapacityFactor { | ||
b.ops = make([]database.BatchOp, 0, cap(b.ops)/database.CapacityReductionFactor) | ||
b.ops = make([]batchOp, 0, cap(b.ops)/database.CapacityReductionFactor) | ||
} else { | ||
b.ops = b.ops[:0] | ||
} | ||
b.Batch.Reset() | ||
} | ||
|
||
// Replay replays the batch contents. | ||
// Assumes it's safe to modify the key argument to w.Delete and w.Put | ||
// after those methods return. | ||
// Replay the batch contents. | ||
func (b *batch) Replay(w database.KeyValueWriterDeleter) error { | ||
for _, op := range b.ops { | ||
keyWithoutPrefix := op.Key[len(b.db.dbPrefix):] | ||
keyWithoutPrefix := (*op.Key)[len(b.db.dbPrefix):] | ||
if op.Delete { | ||
if err := w.Delete(keyWithoutPrefix); err != nil { | ||
return err | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,11 @@ | |
|
||
package utils | ||
|
||
import "crypto/rand" | ||
import ( | ||
"crypto/rand" | ||
"math/bits" | ||
"sync" | ||
) | ||
|
||
// RandomBytes returns a slice of n random bytes | ||
// Intended for use in testing | ||
|
@@ -12,3 +16,68 @@ func RandomBytes(n int) []byte { | |
_, _ = rand.Read(b) | ||
return b | ||
} | ||
|
||
// Constant taken from the "math" package | ||
const intSize = 32 << (^uint(0) >> 63) // 32 or 64 | ||
|
||
// BytesPool tracks buckets of available buffers to be allocated. Each bucket | ||
// allocates buffers of the following length: | ||
// | ||
// 0 | ||
// 1 | ||
// 3 | ||
// 7 | ||
// 15 | ||
// 31 | ||
// 63 | ||
// 127 | ||
// ... | ||
// MaxInt | ||
// | ||
// In order to allocate a buffer of length 19 (for example), we calculate the | ||
// number of bits required to represent 19 (5). And therefore allocate a slice | ||
// from bucket 5, which has length 31. This is the bucket which produces the | ||
// smallest slices that are at least length 19. | ||
// | ||
// When replacing a buffer of length 19, we calculate the number of bits | ||
// required to represent 20 (5). And therefore place the slice into bucket 4, | ||
// which has length 15. This is the bucket which produces the largest slices | ||
// that a length 19 slice can be used for. | ||
type BytesPool [intSize]sync.Pool | ||
|
||
func NewBytesPool() *BytesPool { | ||
var p BytesPool | ||
for i := range p { | ||
// uint is used here to avoid overflowing int during the shift | ||
size := uint(1)<<i - 1 | ||
p[i] = sync.Pool{ | ||
New: func() interface{} { | ||
// Sync pool needs to return pointer-like values to avoid memory | ||
// allocations. | ||
b := make([]byte, size) | ||
return &b | ||
}, | ||
} | ||
} | ||
return &p | ||
} | ||
|
||
// Get returns a non-nil pointer to a slice with the requested length. | ||
// | ||
// It is not guaranteed for the returned bytes to have been zeroed. | ||
func (p *BytesPool) Get(length int) *[]byte { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. q: why do we return a pointer here? We deserialize what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This avoids allocating the slice struct on the heap for every |
||
index := bits.Len(uint(length)) // Round up | ||
bytes := p[index].Get().(*[]byte) | ||
*bytes = (*bytes)[:length] // Set the length to be the expected value | ||
return bytes | ||
} | ||
|
||
// Put takes ownership of a non-nil pointer to a slice of bytes. | ||
// | ||
// Note: this function takes ownership of the underlying array. So, the length | ||
// of the provided slice is ignored and only its capacity is used. | ||
func (p *BytesPool) Put(bytes *[]byte) { | ||
size := cap(*bytes) | ||
index := bits.Len(uint(size+1)) - 1 // Round down | ||
p[index].Put(bytes) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep these comments (esp about
value
)? doesn't seem this change alters the invariants around which byte slices can be modified.Or is this the general invariant of the db interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These invariants are documented on the DB interface now