Skip to content

Commit

Permalink
refactor processing of writes in boltdb index client (grafana#2771)
Browse files Browse the repository at this point in the history
* refactor processing of writes in boltdb index client

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* minor nit suggested in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored Jun 25, 2020
1 parent 167fd0f commit e997537
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 50 deletions.
106 changes: 56 additions & 50 deletions local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -127,9 +128,8 @@ func (b *BoltIndexClient) Stop() {
}

func (b *BoltIndexClient) NewWriteBatch() chunk.WriteBatch {
return &boltWriteBatch{
puts: map[string]map[string][]byte{},
deletes: map[string]map[string]struct{}{},
return &BoltWriteBatch{
Writes: map[string]TableWrites{},
}
}

Expand Down Expand Up @@ -171,52 +171,49 @@ func (b *BoltIndexClient) GetDB(name string, operation int) (*bbolt.DB, error) {
return db, nil
}

func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
// ToDo: too much code duplication, refactor this
for table, kvps := range batch.(*boltWriteBatch).puts {
db, err := b.GetDB(table, DBOperationWrite)
if err != nil {
return err
}
func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, writes TableWrites) error {
return db.Update(func(tx *bbolt.Tx) error {
var b *bbolt.Bucket

if err := db.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(bucketName)
// a bucket should already exist for deletes, for other writes we create one otherwise.
if len(writes.deletes) != 0 {
b = tx.Bucket(bucketName)
if b == nil {
return fmt.Errorf("bucket %s not found in table %s", bucketName, filepath.Base(db.Path()))
}
} else {
var err error
b, err = tx.CreateBucketIfNotExists(bucketName)
if err != nil {
return err
}
}

for key, value := range kvps {
if err := b.Put([]byte(key), value); err != nil {
return err
}
for key, value := range writes.puts {
if err := b.Put([]byte(key), value); err != nil {
return err
}
}

return nil
}); err != nil {
return err
for key := range writes.deletes {
if err := b.Delete([]byte(key)); err != nil {
return err
}
}
}

for table, kvps := range batch.(*boltWriteBatch).deletes {
return nil
})
}

func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
for table, writes := range batch.(*BoltWriteBatch).Writes {
db, err := b.GetDB(table, DBOperationWrite)
if err != nil {
return err
}

if err := db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketName)
if b == nil {
return fmt.Errorf("Bucket %s not found in table %s", bucketName, table)
}

for key := range kvps {
if err := b.Delete([]byte(key)); err != nil {
return err
}
}

return nil
}); err != nil {
err = b.WriteToDB(ctx, db, writes)
if err != nil {
return err
}
}
Expand Down Expand Up @@ -285,31 +282,40 @@ func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk
})
}

type boltWriteBatch struct {
puts map[string]map[string][]byte
deletes map[string]map[string]struct{}
type TableWrites struct {
puts map[string][]byte
deletes map[string]struct{}
}

func (b *boltWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) {
table, ok := b.deletes[tableName]
type BoltWriteBatch struct {
Writes map[string]TableWrites
}

func (b *BoltWriteBatch) getOrCreateTableWrites(tableName string) TableWrites {
writes, ok := b.Writes[tableName]
if !ok {
table = map[string]struct{}{}
b.deletes[tableName] = table
writes = TableWrites{
puts: map[string][]byte{},
deletes: map[string]struct{}{},
}
b.Writes[tableName] = writes
}

return writes
}

func (b *BoltWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) {
writes := b.getOrCreateTableWrites(tableName)

key := hashValue + separator + string(rangeValue)
table[key] = struct{}{}
writes.deletes[key] = struct{}{}
}

func (b *boltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
table, ok := b.puts[tableName]
if !ok {
table = map[string][]byte{}
b.puts[tableName] = table
}
func (b *BoltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
writes := b.getOrCreateTableWrites(tableName)

key := hashValue + separator + string(rangeValue)
table[key] = value
writes.puts[key] = value
}

type boltReadBatch struct {
Expand Down
102 changes: 102 additions & 0 deletions local/boltdb_index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,105 @@ func Test_CreateTable_BoltdbRW(t *testing.T) {
}, have)

}

func TestBoltDB_Writes(t *testing.T) {
dirname, err := ioutil.TempDir(os.TempDir(), "boltdb")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(dirname))
}()

for i, tc := range []struct {
name string
initialPuts []string
testPuts []string
testDeletes []string
err error
valuesAfterWrites []string
}{
{
name: "just puts",
testPuts: []string{"1", "2"},
valuesAfterWrites: []string{"1", "2"},
},
{
name: "just deletes",
initialPuts: []string{"1", "2", "3", "4"},
testDeletes: []string{"1", "2"},
valuesAfterWrites: []string{"3", "4"},
},
{
name: "both puts and deletes",
initialPuts: []string{"1", "2", "3", "4"},
testPuts: []string{"5", "6"},
testDeletes: []string{"1", "2"},
valuesAfterWrites: []string{"3", "4", "5", "6"},
},
{
name: "deletes without initial writes",
testDeletes: []string{"1", "2"},
err: fmt.Errorf("bucket %s not found in table 3", bucketName),
},
} {
t.Run(tc.name, func(t *testing.T) {
tableName := fmt.Sprint(i)

indexClient, err := NewBoltDBIndexClient(BoltDBConfig{
Directory: dirname,
})
require.NoError(t, err)

defer func() {
indexClient.Stop()
}()

// doing initial writes if there are any
if len(tc.initialPuts) != 0 {
batch := indexClient.NewWriteBatch()
for _, put := range tc.initialPuts {
batch.Add(tableName, "hash", []byte(put), []byte(put))
}

require.NoError(t, indexClient.BatchWrite(context.Background(), batch))
}

// doing writes with testPuts and testDeletes
batch := indexClient.NewWriteBatch()
for _, put := range tc.testPuts {
batch.Add(tableName, "hash", []byte(put), []byte(put))
}
for _, put := range tc.testDeletes {
batch.Delete(tableName, "hash", []byte(put))
}

require.Equal(t, tc.err, indexClient.BatchWrite(context.Background(), batch))

// verifying test writes by querying
var have []chunk.IndexEntry
err = indexClient.query(context.Background(), chunk.IndexQuery{
TableName: tableName,
HashValue: "hash",
}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool {
iter := read.Iterator()
for iter.Next() {
have = append(have, chunk.IndexEntry{
RangeValue: iter.RangeValue(),
Value: iter.Value(),
})
}
return true
})

require.NoError(t, err)
require.Len(t, have, len(tc.valuesAfterWrites))

for i, value := range tc.valuesAfterWrites {
require.Equal(t, chunk.IndexEntry{
RangeValue: []byte(value),
Value: []byte(value),
}, have[i])
}
})
}
}

0 comments on commit e997537

Please sign in to comment.