Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve batch performance #303

Merged
merged 2 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 114 additions & 132 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rosedb
import (
"bytes"
"fmt"
"github.com/rosedblabs/rosedb/v2/utils"
"sync"
"time"

Expand All @@ -22,14 +23,15 @@ import (
//
// You must call Commit method to commit the batch, otherwise the DB will be locked.
type Batch struct {
db *DB
pendingWrites []*LogRecord // save the data to be written
options BatchOptions
mu sync.RWMutex
committed bool // whether the batch has been committed
rollbacked bool // whether the batch has been rollbacked
batchId *snowflake.Node
buffers []*bytebufferpool.ByteBuffer
db *DB
pendingWrites []*LogRecord // save the data to be written
pendingWritesMap map[uint64][]int // map record hash key to index, with open hashing
options BatchOptions
mu sync.RWMutex
committed bool // whether the batch has been committed
rollbacked bool // whether the batch has been rollbacked
batchId *snowflake.Node
buffers []*bytebufferpool.ByteBuffer
}

// NewBatch creates a new Batch instance.
Expand Down Expand Up @@ -77,6 +79,9 @@ func (b *Batch) init(rdonly, sync bool, db *DB) *Batch {
func (b *Batch) reset() {
b.db = nil
b.pendingWrites = b.pendingWrites[:0]
for key := range b.pendingWritesMap {
delete(b.pendingWritesMap, key)
}
b.committed = false
b.rollbacked = false
// put all buffers back to the pool
Expand Down Expand Up @@ -116,19 +121,12 @@ func (b *Batch) Put(key []byte, value []byte) error {

b.mu.Lock()
// write to pendingWrites
var record *LogRecord
// if the key exists in pendingWrites, update the value directly
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
if record == nil {
// if the key does not exist in pendingWrites, write a new record
// the record will be put back to the pool when the batch is committed or rollbacked
record = b.db.recordPool.Get().(*LogRecord)
b.pendingWrites = append(b.pendingWrites, record)
b.appendPendingWrites(key, record)
}

record.Key, record.Value = key, value
Expand All @@ -152,19 +150,12 @@ func (b *Batch) PutWithTTL(key []byte, value []byte, ttl time.Duration) error {

b.mu.Lock()
// write to pendingWrites
var record *LogRecord
// if the key exists in pendingWrites, update the value directly
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
if record == nil {
// if the key does not exist in pendingWrites, write a new record
// the record will be put back to the pool when the batch is committed or rollbacked
record = b.db.recordPool.Get().(*LogRecord)
b.pendingWrites = append(b.pendingWrites, record)
b.appendPendingWrites(key, record)
}

record.Key, record.Value = key, value
Expand All @@ -186,13 +177,7 @@ func (b *Batch) Get(key []byte) ([]byte, error) {
now := time.Now().UnixNano()
// get from pendingWrites
b.mu.RLock()
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
b.mu.RUnlock()

// if the record is in pendingWrites, return the value directly
Expand Down Expand Up @@ -240,20 +225,19 @@ func (b *Batch) Delete(key []byte) error {
b.mu.Lock()
// only need key and type when deleting a value.
var exist bool
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
b.pendingWrites[i].Type = LogRecordDeleted
b.pendingWrites[i].Value = nil
b.pendingWrites[i].Expire = 0
exist = true
break
}
var record = b.lookupPendingWrites(key)
if record != nil {
record.Type = LogRecordDeleted
record.Value = nil
record.Expire = 0
exist = true
}
if !exist {
b.pendingWrites = append(b.pendingWrites, &LogRecord{
record = &LogRecord{
Key: key,
Type: LogRecordDeleted,
})
}
b.appendPendingWrites(key, record)
}
b.mu.Unlock()

Expand All @@ -272,13 +256,7 @@ func (b *Batch) Exist(key []byte) (bool, error) {
now := time.Now().UnixNano()
// check if the key exists in pendingWrites
b.mu.RLock()
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
b.mu.RUnlock()

if record != nil {
Expand Down Expand Up @@ -320,13 +298,7 @@ func (b *Batch) Expire(key []byte, ttl time.Duration) error {
b.mu.Lock()
defer b.mu.Unlock()

var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)

// if the key exists in pendingWrites, update the expiry time directly
if record != nil {
Expand All @@ -335,30 +307,30 @@ func (b *Batch) Expire(key []byte, ttl time.Duration) error {
return ErrKeyNotFound
}
record.Expire = time.Now().Add(ttl).UnixNano()
} else {
// if the key does not exist in pendingWrites, get the value from wal
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}
return nil
}
// if the key does not exist in pendingWrites, get the value from wal
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}

now := time.Now()
record = decodeLogRecord(chunk)
// if the record is deleted or expired, we can assume that the key does not exist,
// and delete the key from the index
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
b.db.index.Delete(key)
return ErrKeyNotFound
}
// now we get the value from wal, update the expiry time
// and rewrite the record to pendingWrites
record.Expire = now.Add(ttl).UnixNano()
b.pendingWrites = append(b.pendingWrites, record)
now := time.Now()
record = decodeLogRecord(chunk)
// if the record is deleted or expired, we can assume that the key does not exist,
// and delete the key from the index
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
b.db.index.Delete(key)
return ErrKeyNotFound
}
// now we get the value from wal, update the expiry time
// and rewrite the record to pendingWrites
record.Expire = now.Add(ttl).UnixNano()
b.appendPendingWrites(key, record)

return nil
}
Expand All @@ -376,27 +348,17 @@ func (b *Batch) TTL(key []byte) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()

// check if the key exists in pendingWrites
if len(b.pendingWrites) > 0 {
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
var record = b.lookupPendingWrites(key)
if record != nil {
if record.Expire == 0 {
return -1, nil
}
// if the key exists in pendingWrites, return the ttl directly
if record != nil {
if record.Expire == 0 {
return -1, nil
}
// return key not found if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
return -1, ErrKeyNotFound
}
// now we get the valid expiry time, we can calculate the ttl
return time.Duration(record.Expire - now.UnixNano()), nil
// return key not found if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
return -1, ErrKeyNotFound
}
// now we get the valid expiry time, we can calculate the ttl
return time.Duration(record.Expire - now.UnixNano()), nil
}

// if the key does not exist in pendingWrites, get the value from wal
Expand All @@ -410,7 +372,7 @@ func (b *Batch) TTL(key []byte) (time.Duration, error) {
}

// return key not found if the record is deleted or expired
record := decodeLogRecord(chunk)
record = decodeLogRecord(chunk)
if record.Type == LogRecordDeleted {
return -1, ErrKeyNotFound
}
Expand Down Expand Up @@ -443,48 +405,42 @@ func (b *Batch) Persist(key []byte) error {
defer b.mu.Unlock()

// if the key exists in pendingWrites, update the expiry time directly
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}

var record = b.lookupPendingWrites(key)
if record != nil {
if record.Type == LogRecordDeleted && record.IsExpired(time.Now().UnixNano()) {
return ErrKeyNotFound
}
record.Expire = 0
} else {
// check if the key exists in index
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}
return nil
}

record := decodeLogRecord(chunk)
now := time.Now().UnixNano()
// check if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
return ErrKeyNotFound
}
// if the expiration time is 0, it means that the key has no expiration time,
// so we can return directly
if record.Expire == 0 {
return nil
}
// check if the key exists in index
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}

// set the expiration time to 0, and rewrite the record to wal
record.Expire = 0
b.pendingWrites = append(b.pendingWrites, record)
record = decodeLogRecord(chunk)
now := time.Now().UnixNano()
// check if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
return ErrKeyNotFound
}
// if the expiration time is 0, it means that the key has no expiration time,
// so we can return directly
if record.Expire == 0 {
return nil
}

// set the expiration time to 0, and rewrite the record to wal
record.Expire = 0
b.appendPendingWrites(key, record)

return nil
}

Expand Down Expand Up @@ -602,8 +558,34 @@ func (b *Batch) Rollback() error {
b.db.recordPool.Put(record)
}
b.pendingWrites = b.pendingWrites[:0]
for key := range b.pendingWritesMap {
delete(b.pendingWritesMap, key)
}
}

b.rollbacked = true
return nil
}

// lookupPendingWrites if the key exists in pendingWrites, update the value directly
func (b *Batch) lookupPendingWrites(key []byte) (record *LogRecord) {
if len(b.pendingWritesMap) == 0 {
return
}
hashKey := utils.MemHash(key)
for _, entry := range b.pendingWritesMap[hashKey] {
if bytes.Compare(b.pendingWrites[entry].Key, key) == 0 {
return b.pendingWrites[entry]
}
}
return
}

func (b *Batch) appendPendingWrites(key []byte, record *LogRecord) {
b.pendingWrites = append(b.pendingWrites, record)
if b.pendingWritesMap == nil {
b.pendingWritesMap = make(map[uint64][]int)
}
hashKey := utils.MemHash(key)
b.pendingWritesMap[hashKey] = append(b.pendingWritesMap[hashKey], len(b.pendingWrites)-1)
}
Loading
Loading