From e997537a343ee0f83576c0a191b8914c9064984f Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 25 Jun 2020 14:46:25 +0530 Subject: [PATCH] refactor processing of writes in boltdb index client (#2771) * refactor processing of writes in boltdb index client Signed-off-by: Sandeep Sukhani * minor nit suggested in PR review Signed-off-by: Sandeep Sukhani --- local/boltdb_index_client.go | 106 ++++++++++++++++-------------- local/boltdb_index_client_test.go | 102 ++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 50 deletions(-) diff --git a/local/boltdb_index_client.go b/local/boltdb_index_client.go index 8dad54ee5e10..0ef1a401a145 100644 --- a/local/boltdb_index_client.go +++ b/local/boltdb_index_client.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "path" + "path/filepath" "sync" "time" @@ -127,9 +128,8 @@ func (b *BoltIndexClient) Stop() { } func (b *BoltIndexClient) NewWriteBatch() chunk.WriteBatch { - return &boltWriteBatch{ - puts: map[string]map[string][]byte{}, - deletes: map[string]map[string]struct{}{}, + return &BoltWriteBatch{ + Writes: map[string]TableWrites{}, } } @@ -171,52 +171,49 @@ func (b *BoltIndexClient) GetDB(name string, operation int) (*bbolt.DB, error) { return db, nil } -func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { - // ToDo: too much code duplication, refactor this - for table, kvps := range batch.(*boltWriteBatch).puts { - db, err := b.GetDB(table, DBOperationWrite) - if err != nil { - return err - } +func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, writes TableWrites) error { + return db.Update(func(tx *bbolt.Tx) error { + var b *bbolt.Bucket - if err := db.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists(bucketName) + // a bucket should already exist for deletes, for other writes we create one otherwise. + if len(writes.deletes) != 0 { + b = tx.Bucket(bucketName) + if b == nil { + return fmt.Errorf("bucket %s not found in table %s", bucketName, filepath.Base(db.Path())) + } + } else { + var err error + b, err = tx.CreateBucketIfNotExists(bucketName) if err != nil { return err } + } - for key, value := range kvps { - if err := b.Put([]byte(key), value); err != nil { - return err - } + for key, value := range writes.puts { + if err := b.Put([]byte(key), value); err != nil { + return err } + } - return nil - }); err != nil { - return err + for key := range writes.deletes { + if err := b.Delete([]byte(key)); err != nil { + return err + } } - } - for table, kvps := range batch.(*boltWriteBatch).deletes { + return nil + }) +} + +func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { + for table, writes := range batch.(*BoltWriteBatch).Writes { db, err := b.GetDB(table, DBOperationWrite) if err != nil { return err } - if err := db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(bucketName) - if b == nil { - return fmt.Errorf("Bucket %s not found in table %s", bucketName, table) - } - - for key := range kvps { - if err := b.Delete([]byte(key)); err != nil { - return err - } - } - - return nil - }); err != nil { + err = b.WriteToDB(ctx, db, writes) + if err != nil { return err } } @@ -285,31 +282,40 @@ func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk }) } -type boltWriteBatch struct { - puts map[string]map[string][]byte - deletes map[string]map[string]struct{} +type TableWrites struct { + puts map[string][]byte + deletes map[string]struct{} } -func (b *boltWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) { - table, ok := b.deletes[tableName] +type BoltWriteBatch struct { + Writes map[string]TableWrites +} + +func (b *BoltWriteBatch) getOrCreateTableWrites(tableName string) TableWrites { + writes, ok := b.Writes[tableName] if !ok { - table = map[string]struct{}{} - b.deletes[tableName] = table + writes = TableWrites{ + puts: map[string][]byte{}, + deletes: map[string]struct{}{}, + } + b.Writes[tableName] = writes } + return writes +} + +func (b *BoltWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) { + writes := b.getOrCreateTableWrites(tableName) + key := hashValue + separator + string(rangeValue) - table[key] = struct{}{} + writes.deletes[key] = struct{}{} } -func (b *boltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { - table, ok := b.puts[tableName] - if !ok { - table = map[string][]byte{} - b.puts[tableName] = table - } +func (b *BoltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { + writes := b.getOrCreateTableWrites(tableName) key := hashValue + separator + string(rangeValue) - table[key] = value + writes.puts[key] = value } type boltReadBatch struct { diff --git a/local/boltdb_index_client_test.go b/local/boltdb_index_client_test.go index e2877b481613..578c030b3ad2 100644 --- a/local/boltdb_index_client_test.go +++ b/local/boltdb_index_client_test.go @@ -170,3 +170,105 @@ func Test_CreateTable_BoltdbRW(t *testing.T) { }, have) } + +func TestBoltDB_Writes(t *testing.T) { + dirname, err := ioutil.TempDir(os.TempDir(), "boltdb") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(dirname)) + }() + + for i, tc := range []struct { + name string + initialPuts []string + testPuts []string + testDeletes []string + err error + valuesAfterWrites []string + }{ + { + name: "just puts", + testPuts: []string{"1", "2"}, + valuesAfterWrites: []string{"1", "2"}, + }, + { + name: "just deletes", + initialPuts: []string{"1", "2", "3", "4"}, + testDeletes: []string{"1", "2"}, + valuesAfterWrites: []string{"3", "4"}, + }, + { + name: "both puts and deletes", + initialPuts: []string{"1", "2", "3", "4"}, + testPuts: []string{"5", "6"}, + testDeletes: []string{"1", "2"}, + valuesAfterWrites: []string{"3", "4", "5", "6"}, + }, + { + name: "deletes without initial writes", + testDeletes: []string{"1", "2"}, + err: fmt.Errorf("bucket %s not found in table 3", bucketName), + }, + } { + t.Run(tc.name, func(t *testing.T) { + tableName := fmt.Sprint(i) + + indexClient, err := NewBoltDBIndexClient(BoltDBConfig{ + Directory: dirname, + }) + require.NoError(t, err) + + defer func() { + indexClient.Stop() + }() + + // doing initial writes if there are any + if len(tc.initialPuts) != 0 { + batch := indexClient.NewWriteBatch() + for _, put := range tc.initialPuts { + batch.Add(tableName, "hash", []byte(put), []byte(put)) + } + + require.NoError(t, indexClient.BatchWrite(context.Background(), batch)) + } + + // doing writes with testPuts and testDeletes + batch := indexClient.NewWriteBatch() + for _, put := range tc.testPuts { + batch.Add(tableName, "hash", []byte(put), []byte(put)) + } + for _, put := range tc.testDeletes { + batch.Delete(tableName, "hash", []byte(put)) + } + + require.Equal(t, tc.err, indexClient.BatchWrite(context.Background(), batch)) + + // verifying test writes by querying + var have []chunk.IndexEntry + err = indexClient.query(context.Background(), chunk.IndexQuery{ + TableName: tableName, + HashValue: "hash", + }, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { + iter := read.Iterator() + for iter.Next() { + have = append(have, chunk.IndexEntry{ + RangeValue: iter.RangeValue(), + Value: iter.Value(), + }) + } + return true + }) + + require.NoError(t, err) + require.Len(t, have, len(tc.valuesAfterWrites)) + + for i, value := range tc.valuesAfterWrites { + require.Equal(t, chunk.IndexEntry{ + RangeValue: []byte(value), + Value: []byte(value), + }, have[i]) + } + }) + } +}