From 248a2d74e4a9d93c8d5831b2e4bfec1a9bc4c340 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Wed, 27 Mar 2019 10:51:45 +0100 Subject: [PATCH 1/3] Replace storage prefixes with tables Add rocksdb column families --- balloon/balloon.go | 4 +- balloon/balloon_test.go | 6 +- balloon/cache/lru.go | 8 +- balloon/cache/passthrough.go | 12 +- balloon/cache/passthrough_test.go | 6 +- balloon/history/insert_visitor.go | 18 +- balloon/history/insert_visitor_test.go | 26 +-- balloon/history/tree.go | 6 +- balloon/hyper/insert_test.go | 32 ++-- balloon/hyper/loader.go | 2 +- balloon/hyper/operation.go | 2 +- balloon/hyper/tree.go | 4 +- balloon/hyper/tree_test.go | 12 +- raftwal/fsm.go | 4 +- raftwal/raftrocks/rocksdb_store.go | 10 +- rocksdb/db.go | 1 - storage/badger/badger_store.go | 24 +-- storage/badger/badger_store_test.go | 67 ++++--- storage/bplus/bplus_store.go | 26 +-- storage/bplus/bplus_store_test.go | 65 +++---- storage/pb/backup.pb.go | 95 ++++++++-- storage/pb/backup.proto | 13 +- storage/rocks/rocksdb_store.go | 251 ++++++++++++++++++------- storage/rocks/rocksdb_store_test.go | 67 +++---- storage/store.go | 119 ++++++------ storage/store_test.go | 195 ------------------- 26 files changed, 545 insertions(+), 530 deletions(-) delete mode 100644 storage/store_test.go diff --git a/balloon/balloon.go b/balloon/balloon.go index fcff1765e..93d624f44 100644 --- a/balloon/balloon.go +++ b/balloon/balloon.go @@ -169,7 +169,7 @@ func (b Balloon) Version() uint64 { func (b *Balloon) RefreshVersion() error { // get last stored version - kv, err := b.store.GetLast(storage.HistoryCachePrefix) + kv, err := b.store.GetLast(storage.HistoryCacheTable) if err != nil { if err != storage.ErrKeyNotFound { return err @@ -263,7 +263,7 @@ func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) version = proof.CurrentVersion } - leaf, err = b.store.Get(storage.IndexPrefix, proof.KeyDigest) + leaf, err = b.store.Get(storage.IndexTable, proof.KeyDigest) switch { case err != nil && err != storage.ErrKeyNotFound: return nil, fmt.Errorf("error reading leaf %v data: %v", proof.KeyDigest, err) diff --git a/balloon/balloon_test.go b/balloon/balloon_test.go index d4de35c58..177d20e10 100644 --- a/balloon/balloon_test.go +++ b/balloon/balloon_test.go @@ -217,18 +217,18 @@ func TestTamperAndVerify(t *testing.T) { assert.NoError(t, err) assert.True(t, memProof.Verify(event, snapshot), "The proof should verify correctly") - original, err := store.Get(storage.IndexPrefix, eventDigest) + original, err := store.Get(storage.IndexTable, eventDigest) assert.NoError(t, err) tpBytes := util.Uint64AsBytes(^uint64(0)) assert.NoError(t, store.Mutate( []*storage.Mutation{ - {storage.IndexPrefix, eventDigest, tpBytes}, + {storage.IndexTable, eventDigest, tpBytes}, }, ), "store add returned non nil value") - tampered, _ := store.Get(storage.IndexPrefix, eventDigest) + tampered, _ := store.Get(storage.IndexTable, eventDigest) assert.Equal(t, tpBytes, tampered.Value, "Tamper unsuccessful") assert.NotEqual(t, original.Value, tampered.Value, "Tamper unsuccessful") diff --git a/balloon/cache/lru.go b/balloon/cache/lru.go index 380d555d6..4a28d811b 100644 --- a/balloon/cache/lru.go +++ b/balloon/cache/lru.go @@ -14,16 +14,16 @@ type entry struct { } type LruReadThroughCache struct { - prefix byte + table storage.Table store storage.Store size int items map[[lruKeySize]byte]*list.Element evictList *list.List } -func NewLruReadThroughCache(prefix byte, store storage.Store, cacheSize uint16) *LruReadThroughCache { +func NewLruReadThroughCache(table storage.Table, store storage.Store, cacheSize uint16) *LruReadThroughCache { return &LruReadThroughCache{ - prefix: prefix, + table: table, store: store, size: int(cacheSize), items: make(map[[lruKeySize]byte]*list.Element), @@ -36,7 +36,7 @@ func (c LruReadThroughCache) Get(key []byte) ([]byte, bool) { copy(k[:], key) e, ok := c.items[k] if !ok { - pair, err := c.store.Get(c.prefix, key) + pair, err := c.store.Get(c.table, key) if err != nil { return nil, false } diff --git a/balloon/cache/passthrough.go b/balloon/cache/passthrough.go index 74b2205b7..060c7084a 100644 --- a/balloon/cache/passthrough.go +++ b/balloon/cache/passthrough.go @@ -21,19 +21,19 @@ import ( ) type PassThroughCache struct { - prefix byte - store storage.Store + table storage.Table + store storage.Store } -func NewPassThroughCache(prefix byte, store storage.Store) *PassThroughCache { +func NewPassThroughCache(table storage.Table, store storage.Store) *PassThroughCache { return &PassThroughCache{ - prefix: prefix, - store: store, + table: table, + store: store, } } func (c PassThroughCache) Get(key []byte) ([]byte, bool) { - pair, err := c.store.Get(c.prefix, key) + pair, err := c.store.Get(c.table, key) if err != nil { return nil, false } diff --git a/balloon/cache/passthrough_test.go b/balloon/cache/passthrough_test.go index 1a2b6b707..6b3ed42e2 100644 --- a/balloon/cache/passthrough_test.go +++ b/balloon/cache/passthrough_test.go @@ -39,13 +39,13 @@ func TestPassThroughCache(t *testing.T) { store, closeF := storage_utils.OpenBPlusTreeStore() defer closeF() - prefix := byte(0x0) - cache := NewPassThroughCache(prefix, store) + table := storage.IndexTable + cache := NewPassThroughCache(table, store) for i, c := range testCases { if c.cached { err := store.Mutate([]*storage.Mutation{ - {prefix, c.key, c.value}, + {table, c.key, c.value}, }) require.NoError(t, err) } diff --git a/balloon/history/insert_visitor.go b/balloon/history/insert_visitor.go index 30710ac92..a4b1bdb6e 100644 --- a/balloon/history/insert_visitor.go +++ b/balloon/history/insert_visitor.go @@ -25,19 +25,19 @@ import ( ) type insertVisitor struct { - hasher hashing.Hasher - cache cache.ModifiableCache - storagePrefix byte // TODO shall i remove this? + hasher hashing.Hasher + cache cache.ModifiableCache + storageTable storage.Table // TODO shall i remove this? mutations []*storage.Mutation } -func newInsertVisitor(hasher hashing.Hasher, cache cache.ModifiableCache, storagePrefix byte) *insertVisitor { +func newInsertVisitor(hasher hashing.Hasher, cache cache.ModifiableCache, storageTable storage.Table) *insertVisitor { return &insertVisitor{ - hasher: hasher, - cache: cache, - storagePrefix: storagePrefix, - mutations: make([]*storage.Mutation, 0), + hasher: hasher, + cache: cache, + storageTable: storageTable, + mutations: make([]*storage.Mutation, 0), } } @@ -76,7 +76,7 @@ func (v *insertVisitor) VisitPutCacheOp(op putCacheOp) hashing.Digest { func (v *insertVisitor) VisitMutateOp(op mutateOp) hashing.Digest { hash := op.operation.Accept(v) - v.mutations = append(v.mutations, storage.NewMutation(v.storagePrefix, op.Position().Bytes(), hash)) + v.mutations = append(v.mutations, storage.NewMutation(v.storageTable, op.Position().Bytes(), hash)) return hash } diff --git a/balloon/history/insert_visitor_test.go b/balloon/history/insert_visitor_test.go index 644e1e6b1..9e5787bed 100644 --- a/balloon/history/insert_visitor_test.go +++ b/balloon/history/insert_visitor_test.go @@ -44,24 +44,24 @@ func TestInsertVisitor(t *testing.T) { ))), expectedMutations: []*storage.Mutation{ { - Prefix: storage.HistoryCachePrefix, - Key: pos(7, 0).Bytes(), - Value: []byte{7}, + Table: storage.HistoryCacheTable, + Key: pos(7, 0).Bytes(), + Value: []byte{7}, }, { - Prefix: storage.HistoryCachePrefix, - Key: pos(6, 1).Bytes(), - Value: []byte{7}, + Table: storage.HistoryCacheTable, + Key: pos(6, 1).Bytes(), + Value: []byte{7}, }, { - Prefix: storage.HistoryCachePrefix, - Key: pos(4, 2).Bytes(), - Value: []byte{7}, + Table: storage.HistoryCacheTable, + Key: pos(4, 2).Bytes(), + Value: []byte{7}, }, { - Prefix: storage.HistoryCachePrefix, - Key: pos(0, 3).Bytes(), - Value: []byte{7}, + Table: storage.HistoryCacheTable, + Key: pos(0, 3).Bytes(), + Value: []byte{7}, }, }, expectedElements: []*cachedElement{ @@ -75,7 +75,7 @@ func TestInsertVisitor(t *testing.T) { for i, c := range testCases { cache := cache.NewFakeCache([]byte{0x0}) - visitor := newInsertVisitor(hashing.NewFakeXorHasher(), cache, storage.HistoryCachePrefix) + visitor := newInsertVisitor(hashing.NewFakeXorHasher(), cache, storage.HistoryCacheTable) c.op.Accept(visitor) diff --git a/balloon/history/tree.go b/balloon/history/tree.go index 7d398b6b3..550f43c9e 100644 --- a/balloon/history/tree.go +++ b/balloon/history/tree.go @@ -33,10 +33,10 @@ type HistoryTree struct { func NewHistoryTree(hasherF func() hashing.Hasher, store storage.Store, cacheSize uint16) *HistoryTree { // create cache for Adding - writeCache := cache.NewLruReadThroughCache(storage.HistoryCachePrefix, store, cacheSize) + writeCache := cache.NewLruReadThroughCache(storage.HistoryCacheTable, store, cacheSize) // create cache for Membership and Incremental - readCache := cache.NewPassThroughCache(storage.HistoryCachePrefix, store) + readCache := cache.NewPassThroughCache(storage.HistoryCacheTable, store) return &HistoryTree{ hasherF: hasherF, @@ -51,7 +51,7 @@ func (t *HistoryTree) Add(eventDigest hashing.Digest, version uint64) (hashing.D log.Debugf("Adding new event digest %x with version %d", eventDigest, version) // build a visitable pruned tree and then visit it to generate the root hash - visitor := newInsertVisitor(t.hasher, t.writeCache, storage.HistoryCachePrefix) + visitor := newInsertVisitor(t.hasher, t.writeCache, storage.HistoryCacheTable) rh := pruneToInsert(version, eventDigest).Accept(visitor) return rh, visitor.Result(), nil diff --git a/balloon/hyper/insert_test.go b/balloon/hyper/insert_test.go index 5951c96f3..3ce931f7d 100644 --- a/balloon/hyper/insert_test.go +++ b/balloon/hyper/insert_test.go @@ -340,8 +340,8 @@ func TestInsertInterpretation(t *testing.T) { storedBatches: map[string][]byte{}, expectedMutations: []*storage.Mutation{ { - Prefix: storage.HyperCachePrefix, - Key: pos(0, 4).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(0, 4).Bytes(), Value: []byte{ 0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000 0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0) @@ -388,8 +388,8 @@ func TestInsertInterpretation(t *testing.T) { }, expectedMutations: []*storage.Mutation{ { - Prefix: storage.HyperCachePrefix, - Key: pos(0, 4).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(0, 4).Bytes(), Value: []byte{ 0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000 0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0) @@ -437,8 +437,8 @@ func TestInsertInterpretation(t *testing.T) { }, expectedMutations: []*storage.Mutation{ { - Prefix: storage.HyperCachePrefix, - Key: pos(1, 0).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(1, 0).Bytes(), Value: []byte{ 0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000 0x01, 0x01, // iBatch 0 -> hash=0x01 (shortcut index=0) @@ -447,8 +447,8 @@ func TestInsertInterpretation(t *testing.T) { }, }, { - Prefix: storage.HyperCachePrefix, - Key: pos(0, 0).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(0, 0).Bytes(), Value: []byte{ 0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000 0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0) @@ -457,8 +457,8 @@ func TestInsertInterpretation(t *testing.T) { }, }, { - Prefix: storage.HyperCachePrefix, - Key: pos(0, 4).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(0, 4).Bytes(), Value: []byte{ 0xd1, 0x01, 0x80, 0x00, // bitmap: 11010001 00000001 10000000 00000000 0x01, 0x00, // iBatch 0 -> hash=0x01 @@ -509,8 +509,8 @@ func TestInsertInterpretation(t *testing.T) { }, expectedMutations: []*storage.Mutation{ { - Prefix: storage.HyperCachePrefix, - Key: pos(0, 4).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(0, 4).Bytes(), Value: []byte{ 0xfe, 0x00, 0x00, 0x00, // bitmap: 11111110 00000000 00000000 00000000 0x08, 0x00, // iBatch 0 -> hash=0x08 @@ -566,8 +566,8 @@ func TestInsertInterpretation(t *testing.T) { }, expectedMutations: []*storage.Mutation{ { - Prefix: storage.HyperCachePrefix, - Key: pos(0, 4).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(0, 4).Bytes(), Value: []byte{ 0xfe, 0x1e, 0x00, 0x00, // bitmap: 11111110 00011110 00000000 00000000 0x04, 0x00, // iBatch 0 -> hash=0x08 @@ -622,8 +622,8 @@ func TestInsertInterpretation(t *testing.T) { }, expectedMutations: []*storage.Mutation{ { - Prefix: storage.HyperCachePrefix, - Key: pos(128, 4).Bytes(), + Table: storage.HyperCacheTable, + Key: pos(128, 4).Bytes(), Value: []byte{ 0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000 0x80, 0x01, // iBatch 0 -> hash=0x80 (shortcut index=128) diff --git a/balloon/hyper/loader.go b/balloon/hyper/loader.go index 5b0def252..2b322ecb2 100644 --- a/balloon/hyper/loader.go +++ b/balloon/hyper/loader.go @@ -59,7 +59,7 @@ func (l defaultBatchLoader) loadBatchFromCache(pos position) *batchNode { } func (l defaultBatchLoader) loadBatchFromStore(pos position) *batchNode { - kv, err := l.store.Get(storage.HyperCachePrefix, pos.Bytes()) + kv, err := l.store.Get(storage.HyperCacheTable, pos.Bytes()) if err != nil { if err == storage.ErrKeyNotFound { return newEmptyBatchNode(len(pos.Index)) diff --git a/balloon/hyper/operation.go b/balloon/hyper/operation.go index f376b10a4..5f0083a1e 100644 --- a/balloon/hyper/operation.go +++ b/balloon/hyper/operation.go @@ -140,7 +140,7 @@ func mutateBatch(pos position, batch *batchNode) *operation { Pos: pos, Interpret: func(ops *operationsStack, c *pruningContext) hashing.Digest { hash := ops.Pop().Interpret(ops, c) - c.Mutations = append(c.Mutations, storage.NewMutation(storage.HyperCachePrefix, pos.Bytes(), batch.Serialize())) + c.Mutations = append(c.Mutations, storage.NewMutation(storage.HyperCacheTable, pos.Bytes(), batch.Serialize())) return hash }, } diff --git a/balloon/hyper/tree.go b/balloon/hyper/tree.go index ce8efe32f..08c399496 100644 --- a/balloon/hyper/tree.go +++ b/balloon/hyper/tree.go @@ -94,7 +94,7 @@ func (t *HyperTree) Add(eventDigest hashing.Digest, version uint64) (hashing.Dig rh := ops.Pop().Interpret(ops, ctx) // create a mutation for the new leaf - leafMutation := storage.NewMutation(storage.IndexPrefix, eventDigest, versionAsBytes) + leafMutation := storage.NewMutation(storage.IndexTable, eventDigest, versionAsBytes) // collect mutations mutations := append(ctx.Mutations, leafMutation) @@ -134,7 +134,7 @@ func (t *HyperTree) RebuildCache() { end := make([]byte, 2+t.hasher.Len()/8) start[1] = byte(t.cacheHeightLimit) end[1] = byte(t.cacheHeightLimit + 1) - nodes, err := t.store.GetRange(storage.HyperCachePrefix, start, end) + nodes, err := t.store.GetRange(storage.HyperCacheTable, start, end) if err != nil { log.Fatalf("Oops, something went wrong: %v", err) } diff --git a/balloon/hyper/tree_test.go b/balloon/hyper/tree_test.go index 050dc9e56..b88a9a410 100644 --- a/balloon/hyper/tree_test.go +++ b/balloon/hyper/tree_test.go @@ -122,7 +122,7 @@ func TestProveMembership(t *testing.T) { require.NoErrorf(t, err, "This should not fail for index %d", i) } - leaf, err := store.Get(storage.IndexPrefix, searchedDigest) + leaf, err := store.Get(storage.IndexTable, searchedDigest) require.NoErrorf(t, err, "No leaf with digest %v", err) proof, err := tree.QueryMembership(leaf.Key, leaf.Value) @@ -161,7 +161,7 @@ func TestAddAndVerify(t *testing.T) { require.NoErrorf(t, err, "Add operation should not fail for index %d", i) tree.store.Mutate(mutations) - leaf, err := store.Get(storage.IndexPrefix, key) + leaf, err := store.Get(storage.IndexTable, key) require.NoErrorf(t, err, "No leaf with key %d: %v", key, err) proof, err := tree.QueryMembership(leaf.Key, leaf.Value) @@ -202,8 +202,8 @@ func TestDeterministicAdd(t *testing.T) { } // check index store equality - reader11 := store1.GetAll(storage.IndexPrefix) - reader21 := store2.GetAll(storage.IndexPrefix) + reader11 := store1.GetAll(storage.IndexTable) + reader21 := store2.GetAll(storage.IndexTable) defer reader11.Close() defer reader21.Close() buff11 := make([]*storage.KVPair, 0) @@ -227,8 +227,8 @@ func TestDeterministicAdd(t *testing.T) { require.Equalf(t, buff11, buff21, "The stored indexes should be equal") // check cache store equality - reader12 := store1.GetAll(storage.HyperCachePrefix) - reader22 := store2.GetAll(storage.HyperCachePrefix) + reader12 := store1.GetAll(storage.HyperCacheTable) + reader22 := store2.GetAll(storage.HyperCacheTable) defer reader12.Close() defer reader22.Close() buff12 := make([]*storage.KVPair, 0) diff --git a/raftwal/fsm.go b/raftwal/fsm.go index c19367253..8d80e5517 100644 --- a/raftwal/fsm.go +++ b/raftwal/fsm.go @@ -61,7 +61,7 @@ type BalloonFSM struct { func loadState(s storage.ManagedStore) (*fsmState, error) { var state fsmState - kvstate, err := s.Get(storage.FSMStatePrefix, []byte{0xab}) + kvstate, err := s.Get(storage.FSMStateTable, []byte{0xab}) if err == storage.ErrKeyNotFound { log.Infof("Unable to find previous state: assuming a clean instance") return &fsmState{0, 0, 0}, nil @@ -255,7 +255,7 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse { return &fsmAddResponse{error: err} } - mutations = append(mutations, storage.NewMutation(storage.FSMStatePrefix, []byte{0xab}, stateBuff.Bytes())) + mutations = append(mutations, storage.NewMutation(storage.FSMStateTable, []byte{0xab}, stateBuff.Bytes())) err = fsm.store.Mutate(mutations) if err != nil { return &fsmAddResponse{error: err} diff --git a/raftwal/raftrocks/rocksdb_store.go b/raftwal/raftrocks/rocksdb_store.go index c51e97151..6bb254a8f 100644 --- a/raftwal/raftrocks/rocksdb_store.go +++ b/raftwal/raftrocks/rocksdb_store.go @@ -49,7 +49,6 @@ type RocksDBStore struct { wo *rocksdb.WriteOptions stableCFHandle *rocksdb.ColumnFamilyHandle logCFHandle *rocksdb.ColumnFamilyHandle - cfHandles rocksdb.ColumnFamilyHandles // global options globalOpts *rocksdb.Options @@ -176,12 +175,15 @@ func New(options Options) (*RocksDBStore, error) { // Close is used to gracefully close the DB connection. func (s *RocksDBStore) Close() error { + if s.stableCFHandle != nil { + s.stableCFHandle.Destroy() + } + if s.logCFHandle != nil { + s.logCFHandle.Destroy() + } if s.db != nil { s.db.Close() } - for _, cfh := range s.cfHandles { - cfh.Destroy() - } if s.stableBbto != nil { s.stableBbto.Destroy() } diff --git a/rocksdb/db.go b/rocksdb/db.go index 63eef90dd..9cdb6e0e9 100644 --- a/rocksdb/db.go +++ b/rocksdb/db.go @@ -217,7 +217,6 @@ func (db *DB) Close() error { C.rocksdb_close(db.c) db.c = nil } - db.opts.Destroy() return nil } diff --git a/storage/badger/badger_store.go b/storage/badger/badger_store.go index b975d99f2..e419e5169 100644 --- a/storage/badger/badger_store.go +++ b/storage/badger/badger_store.go @@ -122,7 +122,7 @@ func NewBadgerStoreOpts(opts *Options) (*BadgerStore, error) { func (s BadgerStore) Mutate(mutations []*storage.Mutation) error { return s.db.Update(func(txn *b.Txn) error { for _, m := range mutations { - key := append([]byte{m.Prefix}, m.Key...) + key := append([]byte{m.Table.Prefix()}, m.Key...) err := txn.Set(key, m.Value) if err != nil { return err @@ -132,8 +132,9 @@ func (s BadgerStore) Mutate(mutations []*storage.Mutation) error { }) } -func (s BadgerStore) GetRange(prefix byte, start, end []byte) (storage.KVRange, error) { +func (s BadgerStore) GetRange(table storage.Table, start, end []byte) (storage.KVRange, error) { result := make(storage.KVRange, 0) + prefix := table.Prefix() startKey := append([]byte{prefix}, start...) endKey := append([]byte{prefix}, end...) err := s.db.View(func(txn *b.Txn) error { @@ -163,11 +164,11 @@ func (s BadgerStore) GetRange(prefix byte, start, end []byte) (storage.KVRange, return result, nil } -func (s BadgerStore) Get(prefix byte, key []byte) (*storage.KVPair, error) { +func (s BadgerStore) Get(table storage.Table, key []byte) (*storage.KVPair, error) { result := new(storage.KVPair) result.Key = key err := s.db.View(func(txn *b.Txn) error { - k := append([]byte{prefix}, key...) + k := append([]byte{table.Prefix()}, key...) item, err := txn.Get(k) if err != nil { return err @@ -189,7 +190,7 @@ func (s BadgerStore) Get(prefix byte, key []byte) (*storage.KVPair, error) { } } -func (s BadgerStore) GetLast(prefix byte) (*storage.KVPair, error) { +func (s BadgerStore) GetLast(table storage.Table) (*storage.KVPair, error) { result := new(storage.KVPair) err := s.db.View(func(txn *b.Txn) error { var err error @@ -200,6 +201,7 @@ func (s BadgerStore) GetLast(prefix byte) (*storage.KVPair, error) { defer it.Close() // we are using a reversed iterator so we need to seek for // the last possible key for history prefix + prefix := table.Prefix() it.Seek([]byte{prefix, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) if it.ValidForPrefix([]byte{prefix}) { item := it.Item() @@ -227,12 +229,12 @@ type BadgerKVPairReader struct { it *b.Iterator } -func NewBadgerKVPairReader(prefix byte, txn *b.Txn) *BadgerKVPairReader { +func NewBadgerKVPairReader(table storage.Table, txn *b.Txn) *BadgerKVPairReader { opts := b.DefaultIteratorOptions opts.PrefetchSize = 10 it := txn.NewIterator(opts) - it.Seek([]byte{prefix}) - return &BadgerKVPairReader{prefix, txn, it} + it.Seek([]byte{table.Prefix()}) + return &BadgerKVPairReader{table.Prefix(), txn, it} } func (r *BadgerKVPairReader) Read(buffer []*storage.KVPair) (n int, err error) { @@ -257,8 +259,8 @@ func (r *BadgerKVPairReader) Close() { r.txn.Discard() } -func (s BadgerStore) GetAll(prefix byte) storage.KVPairReader { - return NewBadgerKVPairReader(prefix, s.db.NewTransaction(false)) +func (s BadgerStore) GetAll(table storage.Table) storage.KVPairReader { + return NewBadgerKVPairReader(table, s.db.NewTransaction(false)) } func (s BadgerStore) Close() error { @@ -328,7 +330,7 @@ func (s *BadgerStore) Load(r io.Reader) error { return s.db.Load(r) } -// Take a snapshot of the store, and returns and id +// Snapshot takes a snapshot of the store, and returns and id // to be used in the back up process. The state of the // snapshot is stored in the store instance. // In badger the id corresponds to the last version stored. diff --git a/storage/badger/badger_store_test.go b/storage/badger/badger_store_test.go index 19ff08bb3..0c28f8466 100644 --- a/storage/badger/badger_store_test.go +++ b/storage/badger/badger_store_test.go @@ -33,22 +33,22 @@ import ( func TestMutate(t *testing.T) { store, closeF := openBadgerStore(t) defer closeF() - prefix := byte(0x0) tests := []struct { testname string + table storage.Table key, value []byte expectedError error }{ - {"Mutate Key=Value", []byte("Key"), []byte("Value"), nil}, + {"Mutate Key=Value", storage.IndexTable, []byte("Key"), []byte("Value"), nil}, } for _, test := range tests { err := store.Mutate([]*storage.Mutation{ - {prefix, test.key, test.value}, + {test.table, test.key, test.value}, }) require.Equalf(t, test.expectedError, err, "Error mutating in test: %s", test.testname) - _, err = store.Get(prefix, test.key) + _, err = store.Get(test.table, test.key) require.Equalf(t, test.expectedError, err, "Error getting key in test: %s", test.testname) } } @@ -59,25 +59,25 @@ func TestGetExistentKey(t *testing.T) { defer closeF() testCases := []struct { - prefix byte + table storage.Table key, value []byte expectedError error }{ - {byte(0x0), []byte("Key1"), []byte("Value1"), nil}, - {byte(0x0), []byte("Key2"), []byte("Value2"), nil}, - {byte(0x1), []byte("Key3"), []byte("Value3"), nil}, - {byte(0x1), []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, + {storage.IndexTable, []byte("Key1"), []byte("Value1"), nil}, + {storage.IndexTable, []byte("Key2"), []byte("Value2"), nil}, + {storage.HyperCacheTable, []byte("Key3"), []byte("Value3"), nil}, + {storage.HyperCacheTable, []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, } for _, test := range testCases { if test.expectedError == nil { err := store.Mutate([]*storage.Mutation{ - {test.prefix, test.key, test.value}, + {test.table, test.key, test.value}, }) require.NoError(t, err) } - stored, err := store.Get(test.prefix, test.key) + stored, err := store.Get(test.table, test.key) if test.expectedError == nil { require.NoError(t, err) require.Equalf(t, stored.Key, test.key, "The stored key does not match the original: expected %d, actual %d", test.key, stored.Key) @@ -106,15 +106,15 @@ func TestGetRange(t *testing.T) { {0, 20, 10}, } - prefix := byte(0x0) + table := storage.IndexTable for i := 10; i < 50; i++ { store.Mutate([]*storage.Mutation{ - {prefix, []byte{byte(i)}, []byte("Value")}, + {table, []byte{byte(i)}, []byte("Value")}, }) } for _, test := range testCases { - slice, err := store.GetRange(prefix, []byte{test.start}, []byte{test.end}) + slice, err := store.GetRange(table, []byte{test.start}, []byte{test.end}) require.NoError(t, err) require.Equalf(t, len(slice), test.size, "Slice length invalid: expected %d, actual %d", test.size, len(slice)) } @@ -123,7 +123,7 @@ func TestGetRange(t *testing.T) { func TestGetAll(t *testing.T) { - prefix := storage.HyperCachePrefix + table := storage.HyperCacheTable numElems := uint16(1000) testCases := []struct { batchSize int @@ -142,12 +142,12 @@ func TestGetAll(t *testing.T) { for i := uint16(0); i < numElems; i++ { key := util.Uint16AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix, key, key}, + {table, key, key}, }) } for i, c := range testCases { - reader := store.GetAll(storage.HyperCachePrefix) + reader := store.GetAll(table) numBatches := 0 var lastBatchLen int for { @@ -172,18 +172,18 @@ func TestGetLast(t *testing.T) { // insert numElems := uint64(20) - prefixes := [][]byte{{storage.IndexPrefix}, {storage.HistoryCachePrefix}, {storage.HyperCachePrefix}} - for _, prefix := range prefixes { + tables := []storage.Table{storage.IndexTable, storage.HistoryCacheTable, storage.HyperCacheTable} + for _, table := range tables { for i := uint64(0); i < numElems; i++ { key := util.Uint64AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix[0], key, key}, + {table, key, key}, }) } } - // get last element for history prefix - kv, err := store.GetLast(storage.HistoryCachePrefix) + // get last element for history table + kv, err := store.GetLast(storage.HistoryCacheTable) require.NoError(t, err) require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Key, "The key should match the last inserted element") require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Value, "The value should match the last inserted element") @@ -195,12 +195,12 @@ func TestBackupLoad(t *testing.T) { // insert numElems := uint64(20) - prefixes := [][]byte{{storage.IndexPrefix}, {storage.HistoryCachePrefix}, {storage.HyperCachePrefix}} - for _, prefix := range prefixes { + tables := []storage.Table{storage.IndexTable, storage.HistoryCacheTable, storage.HyperCacheTable} + for _, table := range tables { for i := uint64(0); i < numElems; i++ { key := util.Uint64AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix[0], key, key}, + {table, key, key}, }) } } @@ -227,12 +227,11 @@ func TestBackupLoad(t *testing.T) { func BenchmarkMutate(b *testing.B) { store, closeF := openBadgerStore(b) defer closeF() - prefix := byte(0x0) b.N = 10000 b.ResetTimer() for i := 0; i < b.N; i++ { store.Mutate([]*storage.Mutation{ - {prefix, rand.Bytes(128), []byte("Value")}, + {storage.IndexTable, rand.Bytes(128), []byte("Value")}, }) } @@ -241,7 +240,6 @@ func BenchmarkMutate(b *testing.B) { func BenchmarkGet(b *testing.B) { store, closeF := openBadgerStore(b) defer closeF() - prefix := byte(0x0) N := 10000 b.N = N var key []byte @@ -251,11 +249,11 @@ func BenchmarkGet(b *testing.B) { if i == 10 { key = rand.Bytes(128) store.Mutate([]*storage.Mutation{ - {prefix, key, []byte("Value")}, + {storage.IndexTable, key, []byte("Value")}, }) } else { store.Mutate([]*storage.Mutation{ - {prefix, rand.Bytes(128), []byte("Value")}, + {storage.IndexTable, rand.Bytes(128), []byte("Value")}, }) } } @@ -263,7 +261,7 @@ func BenchmarkGet(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - store.Get(prefix, key) + store.Get(storage.IndexTable, key) } } @@ -271,13 +269,12 @@ func BenchmarkGet(b *testing.B) { func BenchmarkGetRangeInLargeTree(b *testing.B) { store, closeF := openBadgerStore(b) defer closeF() - prefix := byte(0x0) N := 1000000 // populate storage for i := 0; i < N; i++ { store.Mutate([]*storage.Mutation{ - {prefix, []byte{byte(i)}, []byte("Value")}, + {storage.IndexTable, []byte{byte(i)}, []byte("Value")}, }) } @@ -286,14 +283,14 @@ func BenchmarkGetRangeInLargeTree(b *testing.B) { b.Run("Small range", func(b *testing.B) { b.N = 10000 for i := 0; i < b.N; i++ { - store.GetRange(prefix, []byte{10}, []byte{10}) + store.GetRange(storage.IndexTable, []byte{10}, []byte{10}) } }) b.Run("Large range", func(b *testing.B) { b.N = 10000 for i := 0; i < b.N; i++ { - store.GetRange(prefix, []byte{10}, []byte{35}) + store.GetRange(storage.IndexTable, []byte{10}, []byte{35}) } }) diff --git a/storage/bplus/bplus_store.go b/storage/bplus/bplus_store.go index f2422bdad..a10c854da 100644 --- a/storage/bplus/bplus_store.go +++ b/storage/bplus/bplus_store.go @@ -41,16 +41,16 @@ func (p KVItem) Less(b btree.Item) bool { func (s *BPlusTreeStore) Mutate(mutations []*storage.Mutation) error { for _, m := range mutations { - key := append([]byte{m.Prefix}, m.Key...) + key := append([]byte{m.Table.Prefix()}, m.Key...) s.db.ReplaceOrInsert(KVItem{key, m.Value}) } return nil } -func (s BPlusTreeStore) GetRange(prefix byte, start, end []byte) (storage.KVRange, error) { +func (s BPlusTreeStore) GetRange(table storage.Table, start, end []byte) (storage.KVRange, error) { result := make(storage.KVRange, 0) - startKey := append([]byte{prefix}, start...) - endKey := append([]byte{prefix}, end...) + startKey := append([]byte{table.Prefix()}, start...) + endKey := append([]byte{table.Prefix()}, end...) s.db.AscendGreaterOrEqual(KVItem{startKey, nil}, func(i btree.Item) bool { key := i.(KVItem).Key if bytes.Compare(key, endKey) > 0 { @@ -62,10 +62,10 @@ func (s BPlusTreeStore) GetRange(prefix byte, start, end []byte) (storage.KVRang return result, nil } -func (s BPlusTreeStore) Get(prefix byte, key []byte) (*storage.KVPair, error) { +func (s BPlusTreeStore) Get(table storage.Table, key []byte) (*storage.KVPair, error) { result := new(storage.KVPair) result.Key = key - k := append([]byte{prefix}, key...) + k := append([]byte{table.Prefix()}, key...) item := s.db.Get(KVItem{k, nil}) if item != nil { result.Value = item.(KVItem).Value @@ -75,9 +75,9 @@ func (s BPlusTreeStore) Get(prefix byte, key []byte) (*storage.KVPair, error) { } } -func (s BPlusTreeStore) GetLast(prefix byte) (*storage.KVPair, error) { +func (s BPlusTreeStore) GetLast(table storage.Table) (*storage.KVPair, error) { result := new(storage.KVPair) - s.db.DescendGreaterThan(KVItem{[]byte{prefix}, nil}, func(i btree.Item) bool { + s.db.DescendGreaterThan(KVItem{[]byte{table.Prefix()}, nil}, func(i btree.Item) bool { item := i.(KVItem) result.Key = item.Key[1:] result.Value = item.Value @@ -89,8 +89,8 @@ func (s BPlusTreeStore) GetLast(prefix byte) (*storage.KVPair, error) { return result, nil } -func (s BPlusTreeStore) GetAll(prefix byte) storage.KVPairReader { - return NewBPlusKVPairReader(prefix, s.db) +func (s BPlusTreeStore) GetAll(table storage.Table) storage.KVPairReader { + return NewBPlusKVPairReader(table, s.db) } type BPlusKVPairReader struct { @@ -99,11 +99,11 @@ type BPlusKVPairReader struct { lastKey []byte } -func NewBPlusKVPairReader(prefix byte, db *btree.BTree) *BPlusKVPairReader { +func NewBPlusKVPairReader(table storage.Table, db *btree.BTree) *BPlusKVPairReader { return &BPlusKVPairReader{ - prefix: prefix, + prefix: table.Prefix(), db: db, - lastKey: []byte{prefix}, + lastKey: []byte{table.Prefix()}, } } diff --git a/storage/bplus/bplus_store_test.go b/storage/bplus/bplus_store_test.go index f604d2e00..55c59f6aa 100644 --- a/storage/bplus/bplus_store_test.go +++ b/storage/bplus/bplus_store_test.go @@ -29,22 +29,26 @@ import ( func TestMutate(t *testing.T) { store, closeF := openBPlusTreeStore() defer closeF() - prefix := byte(0x0) tests := []struct { testname string + table storage.Table key, value []byte expectedError error }{ - {"Mutate Key=Value", []byte("Key"), []byte("Value"), nil}, + {"Mutate Key=Value", storage.IndexTable, []byte("Key"), []byte("Value"), nil}, } for _, test := range tests { err := store.Mutate([]*storage.Mutation{ - {prefix, test.key, test.value}, + { + Table: test.table, + Key: test.key, + Value: test.value, + }, }) require.Equalf(t, test.expectedError, err, "Error mutating in test: %s", test.testname) - _, err = store.Get(prefix, test.key) + _, err = store.Get(test.table, test.key) require.Equalf(t, test.expectedError, err, "Error getting key in test: %s", test.testname) } } @@ -55,25 +59,25 @@ func TestGetExistentKey(t *testing.T) { defer closeF() testCases := []struct { - prefix byte + table storage.Table key, value []byte expectedError error }{ - {byte(0x0), []byte("Key1"), []byte("Value1"), nil}, - {byte(0x0), []byte("Key2"), []byte("Value2"), nil}, - {byte(0x1), []byte("Key3"), []byte("Value3"), nil}, - {byte(0x1), []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, + {storage.IndexTable, []byte("Key1"), []byte("Value1"), nil}, + {storage.IndexTable, []byte("Key2"), []byte("Value2"), nil}, + {storage.HyperCacheTable, []byte("Key3"), []byte("Value3"), nil}, + {storage.HyperCacheTable, []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, } for _, test := range testCases { if test.expectedError == nil { err := store.Mutate([]*storage.Mutation{ - {test.prefix, test.key, test.value}, + {test.table, test.key, test.value}, }) require.NoError(t, err) } - stored, err := store.Get(test.prefix, test.key) + stored, err := store.Get(test.table, test.key) if test.expectedError == nil { require.NoError(t, err) require.Equalf(t, stored.Key, test.key, "The stored key does not match the original: expected %d, actual %d", test.key, stored.Key) @@ -100,15 +104,15 @@ func TestGetRange(t *testing.T) { {0, 20, 10}, } - prefix := byte(0x0) + table := storage.IndexTable for i := 10; i < 50; i++ { store.Mutate([]*storage.Mutation{ - {prefix, []byte{byte(i)}, []byte("Value")}, + {table, []byte{byte(i)}, []byte("Value")}, }) } for _, test := range testCases { - slice, err := store.GetRange(prefix, []byte{test.start}, []byte{test.end}) + slice, err := store.GetRange(table, []byte{test.start}, []byte{test.end}) require.NoError(t, err) require.Equalf(t, len(slice), test.size, "Slice length invalid: expected %d, actual %d", test.size, len(slice)) } @@ -117,7 +121,7 @@ func TestGetRange(t *testing.T) { func TestGetAll(t *testing.T) { - prefix := storage.HyperCachePrefix + table := storage.HyperCacheTable numElems := uint16(1000) testCases := []struct { batchSize int @@ -136,12 +140,12 @@ func TestGetAll(t *testing.T) { for i := uint16(0); i < numElems; i++ { key := util.Uint16AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix, key, key}, + {table, key, key}, }) } for i, c := range testCases { - reader := store.GetAll(storage.HyperCachePrefix) + reader := store.GetAll(table) numBatches := 0 var lastBatchLen int for { @@ -166,18 +170,18 @@ func TestGetLast(t *testing.T) { // insert numElems := uint64(20) - prefixes := [][]byte{{storage.IndexPrefix}, {storage.HistoryCachePrefix}, {storage.HyperCachePrefix}} - for _, prefix := range prefixes { + tables := []storage.Table{storage.IndexTable, storage.HistoryCacheTable, storage.HyperCacheTable} + for _, table := range tables { for i := uint64(0); i < numElems; i++ { key := util.Uint64AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix[0], key, key}, + {table, key, key}, }) } } - // get last element for history prefix - kv, err := store.GetLast(storage.HistoryCachePrefix) + // get last element for history table + kv, err := store.GetLast(storage.HistoryCacheTable) require.NoError(t, err) require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Key, "The key should match the last inserted element") require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Value, "The value should match the last inserted element") @@ -186,12 +190,11 @@ func TestGetLast(t *testing.T) { func BenchmarkMutate(b *testing.B) { store, closeF := openBPlusTreeStore() defer closeF() - prefix := byte(0x0) b.N = 10000 b.ResetTimer() for i := 0; i < b.N; i++ { store.Mutate([]*storage.Mutation{ - {prefix, rand.Bytes(128), []byte("Value")}, + {storage.IndexTable, rand.Bytes(128), []byte("Value")}, }) } } @@ -199,7 +202,6 @@ func BenchmarkMutate(b *testing.B) { func BenchmarkGet(b *testing.B) { store, closeF := openBPlusTreeStore() defer closeF() - prefix := byte(0x0) N := 10000 b.N = N var key []byte @@ -209,11 +211,11 @@ func BenchmarkGet(b *testing.B) { if i == 10 { key = rand.Bytes(128) store.Mutate([]*storage.Mutation{ - {prefix, key, []byte("Value")}, + {storage.IndexTable, key, []byte("Value")}, }) } else { store.Mutate([]*storage.Mutation{ - {prefix, rand.Bytes(128), []byte("Value")}, + {storage.IndexTable, rand.Bytes(128), []byte("Value")}, }) } } @@ -221,7 +223,7 @@ func BenchmarkGet(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - store.Get(prefix, key) + store.Get(storage.IndexTable, key) } } @@ -229,13 +231,12 @@ func BenchmarkGet(b *testing.B) { func BenchmarkGetRangeInLargeTree(b *testing.B) { store, closeF := openBPlusTreeStore() defer closeF() - prefix := byte(0x0) N := 1000000 // populate storage for i := 0; i < N; i++ { store.Mutate([]*storage.Mutation{ - {prefix, []byte{byte(i)}, []byte("Value")}, + {storage.IndexTable, []byte{byte(i)}, []byte("Value")}, }) } @@ -244,14 +245,14 @@ func BenchmarkGetRangeInLargeTree(b *testing.B) { b.Run("Small range", func(b *testing.B) { b.N = 10000 for i := 0; i < b.N; i++ { - store.GetRange(prefix, []byte{10}, []byte{10}) + store.GetRange(storage.IndexTable, []byte{10}, []byte{10}) } }) b.Run("Large range", func(b *testing.B) { b.N = 10000 for i := 0; i < b.N; i++ { - store.GetRange(prefix, []byte{10}, []byte{35}) + store.GetRange(storage.IndexTable, []byte{10}, []byte{35}) } }) diff --git a/storage/pb/backup.pb.go b/storage/pb/backup.pb.go index 02dfbe169..edf5fb899 100644 --- a/storage/pb/backup.pb.go +++ b/storage/pb/backup.pb.go @@ -29,9 +29,40 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +type Table int32 + +const ( + Table_DEFAULT Table = 0 + Table_INDEX Table = 1 + Table_HYPER Table = 2 + Table_HISTORY Table = 3 + Table_FSM Table = 4 +) + +var Table_name = map[int32]string{ + 0: "DEFAULT", + 1: "INDEX", + 2: "HYPER", + 3: "HISTORY", + 4: "FSM", +} +var Table_value = map[string]int32{ + "DEFAULT": 0, + "INDEX": 1, + "HYPER": 2, + "HISTORY": 3, + "FSM": 4, +} + +func (x Table) String() string { + return proto.EnumName(Table_name, int32(x)) +} +func (Table) EnumDescriptor() ([]byte, []int) { return fileDescriptorBackup, []int{0} } + type KVPair struct { - Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Table Table `protobuf:"varint,1,opt,name=table,proto3,enum=pb.Table" json:"table,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` } func (m *KVPair) Reset() { *m = KVPair{} } @@ -39,6 +70,13 @@ func (m *KVPair) String() string { return proto.CompactTextString(m) func (*KVPair) ProtoMessage() {} func (*KVPair) Descriptor() ([]byte, []int) { return fileDescriptorBackup, []int{0} } +func (m *KVPair) GetTable() Table { + if m != nil { + return m.Table + } + return Table_DEFAULT +} + func (m *KVPair) GetKey() []byte { if m != nil { return m.Key @@ -55,6 +93,7 @@ func (m *KVPair) GetValue() []byte { func init() { proto.RegisterType((*KVPair)(nil), "pb.KVPair") + proto.RegisterEnum("pb.Table", Table_name, Table_value) } func (m *KVPair) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -71,14 +110,19 @@ func (m *KVPair) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Table != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintBackup(dAtA, i, uint64(m.Table)) + } if len(m.Key) > 0 { - dAtA[i] = 0xa + dAtA[i] = 0x12 i++ i = encodeVarintBackup(dAtA, i, uint64(len(m.Key))) i += copy(dAtA[i:], m.Key) } if len(m.Value) > 0 { - dAtA[i] = 0x12 + dAtA[i] = 0x1a i++ i = encodeVarintBackup(dAtA, i, uint64(len(m.Value))) i += copy(dAtA[i:], m.Value) @@ -98,6 +142,9 @@ func encodeVarintBackup(dAtA []byte, offset int, v uint64) int { func (m *KVPair) Size() (n int) { var l int _ = l + if m.Table != 0 { + n += 1 + sovBackup(uint64(m.Table)) + } l = len(m.Key) if l > 0 { n += 1 + l + sovBackup(uint64(l)) @@ -152,6 +199,25 @@ func (m *KVPair) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Table", wireType) + } + m.Table = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Table |= (Table(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) } @@ -182,7 +248,7 @@ func (m *KVPair) Unmarshal(dAtA []byte) error { m.Key = []byte{} } iNdEx = postIndex - case 2: + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) } @@ -342,12 +408,17 @@ var ( func init() { proto.RegisterFile("backup.proto", fileDescriptorBackup) } var fileDescriptorBackup = []byte{ - // 107 bytes of a gzipped FileDescriptorProto + // 192 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4a, 0x4c, 0xce, - 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x32, 0xe0, 0x62, - 0xf3, 0x0e, 0x0b, 0x48, 0xcc, 0x2c, 0x12, 0x12, 0xe0, 0x62, 0xce, 0x4e, 0xad, 0x94, 0x60, 0x54, - 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31, 0x85, 0x44, 0xb8, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, 0x25, - 0x98, 0xc0, 0x62, 0x10, 0x8e, 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, - 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0xd8, 0x38, 0x63, 0x40, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x4b, 0x58, 0x23, 0x5a, 0x5e, 0x00, 0x00, 0x00, + 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x0a, 0xe4, 0x62, + 0xf3, 0x0e, 0x0b, 0x48, 0xcc, 0x2c, 0x12, 0x92, 0xe7, 0x62, 0x2d, 0x49, 0x4c, 0xca, 0x49, 0x95, + 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x33, 0xe2, 0xd4, 0x2b, 0x48, 0xd2, 0x0b, 0x01, 0x09, 0x04, 0x41, + 0xc4, 0x85, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, 0x98, 0x14, 0x18, 0x35, 0x78, 0x82, 0x40, + 0x4c, 0x21, 0x11, 0x2e, 0xd6, 0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x66, 0xb0, 0x18, 0x84, 0xa3, + 0xe5, 0xc0, 0xc5, 0x0a, 0xd6, 0x27, 0xc4, 0xcd, 0xc5, 0xee, 0xe2, 0xea, 0xe6, 0x18, 0xea, 0x13, + 0x22, 0xc0, 0x20, 0xc4, 0xc9, 0xc5, 0xea, 0xe9, 0xe7, 0xe2, 0x1a, 0x21, 0xc0, 0x08, 0x62, 0x7a, + 0x44, 0x06, 0xb8, 0x06, 0x09, 0x30, 0x81, 0x94, 0x78, 0x78, 0x06, 0x87, 0xf8, 0x07, 0x45, 0x0a, + 0x30, 0x0b, 0xb1, 0x73, 0x31, 0xbb, 0x05, 0xfb, 0x0a, 0xb0, 0x38, 0x09, 0x9c, 0x78, 0x24, 0xc7, + 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, + 0x5d, 0x6c, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x42, 0x5f, 0x2b, 0x45, 0xc1, 0x00, 0x00, 0x00, } diff --git a/storage/pb/backup.proto b/storage/pb/backup.proto index dd3651138..0cd76f763 100644 --- a/storage/pb/backup.proto +++ b/storage/pb/backup.proto @@ -18,7 +18,16 @@ syntax = "proto3"; package pb; +enum Table { + DEFAULT = 0; + INDEX = 1; + HYPER = 2; + HISTORY = 3; + FSM = 4; +} + message KVPair { - bytes key = 1; - bytes value = 2; + Table table = 1; + bytes key = 2; + bytes value = 3; } \ No newline at end of file diff --git a/storage/rocks/rocksdb_store.go b/storage/rocks/rocksdb_store.go index 2ceb2e627..3d07d1fde 100644 --- a/storage/rocks/rocksdb_store.go +++ b/storage/rocks/rocksdb_store.go @@ -42,6 +42,18 @@ type RocksDBStore struct { // inside checkPointPath folder checkpoints map[uint64]string + // column family handlers + cfHandles rocksdb.ColumnFamilyHandles + + // global options + globalOpts *rocksdb.Options + defaultTableOpts *rocksdb.Options + indexTableOpts *rocksdb.Options + hyperCacheTableOpts *rocksdb.Options + historyCacheTableOpts *rocksdb.Options + fsmStateTableOpts *rocksdb.Options + + // read/write options ro *rocksdb.ReadOptions wo *rocksdb.WriteOptions } @@ -56,26 +68,52 @@ func NewRocksDBStore(path string) (*RocksDBStore, error) { } func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { - rocksdbOpts := rocksdb.NewDefaultOptions() - rocksdbOpts.SetCreateIfMissing(true) - rocksdbOpts.IncreaseParallelism(4) - rocksdbOpts.SetMaxWriteBufferNumber(5) - rocksdbOpts.SetMinWriteBufferNumberToMerge(2) + cfNames := []string{ + storage.DefaultTable.String(), + storage.IndexTable.String(), + storage.HyperCacheTable.String(), + storage.HistoryCacheTable.String(), + storage.FSMStateTable.String(), + } + + // global options + globalOpts := rocksdb.NewDefaultOptions() + globalOpts.SetCreateIfMissing(true) + globalOpts.SetCreateIfMissingColumnFamilies(true) var stats *rocksdb.Statistics if opts.EnableStatistics { stats = rocksdb.NewStatistics() - rocksdbOpts.SetStatistics(stats) + globalOpts.SetStatistics(stats) + } + + defaultTableOpts := rocksdb.NewDefaultOptions() + indexTableOpts := getIndexTableOpts() + hyperCacheTableOpts := getHyperCacheTableOpts() + historyCacheTableOpts := getHistoryCacheTableOpts() + fsmStateTableOpts := getFsmStateTableOpts() + + cfOpts := []*rocksdb.Options{ + defaultTableOpts, + indexTableOpts, + hyperCacheTableOpts, + historyCacheTableOpts, + fsmStateTableOpts, } - blockOpts := rocksdb.NewDefaultBlockBasedTableOptions() - blockOpts.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10)) - rocksdbOpts.SetBlockBasedTableFactory(blockOpts) + // rocksdbOpts.IncreaseParallelism(4) + // rocksdbOpts.SetMaxWriteBufferNumber(5) + // rocksdbOpts.SetMinWriteBufferNumberToMerge(2) + + // blockOpts := rocksdb.NewDefaultBlockBasedTableOptions() + // blockOpts.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10)) + // rocksdbOpts.SetBlockBasedTableFactory(blockOpts) - db, err := rocksdb.OpenDB(opts.Path, rocksdbOpts) + db, cfHandles, err := rocksdb.OpenDBColumnFamilies(opts.Path, globalOpts, cfNames, cfOpts) if err != nil { return nil, err } + checkPointPath := opts.Path + "/checkpoints" err = os.MkdirAll(checkPointPath, 0755) if err != nil { @@ -83,12 +121,19 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { } store := &RocksDBStore{ - db: db, - stats: stats, - checkPointPath: checkPointPath, - checkpoints: make(map[uint64]string), - wo: rocksdb.NewDefaultWriteOptions(), - ro: rocksdb.NewDefaultReadOptions(), + db: db, + stats: stats, + cfHandles: cfHandles, + checkPointPath: checkPointPath, + checkpoints: make(map[uint64]string), + globalOpts: globalOpts, + defaultTableOpts: defaultTableOpts, + indexTableOpts: indexTableOpts, + hyperCacheTableOpts: hyperCacheTableOpts, + historyCacheTableOpts: historyCacheTableOpts, + fsmStateTableOpts: fsmStateTableOpts, + wo: rocksdb.NewDefaultWriteOptions(), + ro: rocksdb.NewDefaultReadOptions(), } if rms == nil && stats != nil { @@ -98,22 +143,48 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { return store, nil } -func (s RocksDBStore) Mutate(mutations []*storage.Mutation) error { +func getIndexTableOpts() *rocksdb.Options { + bbto := rocksdb.NewDefaultBlockBasedTableOptions() + opts := rocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(bbto) + return opts +} + +func getHyperCacheTableOpts() *rocksdb.Options { + bbto := rocksdb.NewDefaultBlockBasedTableOptions() + opts := rocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(bbto) + return opts +} + +func getHistoryCacheTableOpts() *rocksdb.Options { + bbto := rocksdb.NewDefaultBlockBasedTableOptions() + opts := rocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(bbto) + return opts +} + +func getFsmStateTableOpts() *rocksdb.Options { + bbto := rocksdb.NewDefaultBlockBasedTableOptions() + opts := rocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(bbto) + return opts +} + +func (s *RocksDBStore) Mutate(mutations []*storage.Mutation) error { batch := rocksdb.NewWriteBatch() defer batch.Destroy() for _, m := range mutations { - key := append([]byte{m.Prefix}, m.Key...) - batch.Put(key, m.Value) + batch.PutCF(s.cfHandles[m.Table], m.Key, m.Value) } err := s.db.Write(s.wo, batch) return err } -func (s RocksDBStore) Get(prefix byte, key []byte) (*storage.KVPair, error) { +func (s *RocksDBStore) Get(table storage.Table, key []byte) (*storage.KVPair, error) { result := new(storage.KVPair) result.Key = key - k := append([]byte{prefix}, key...) - v, err := s.db.GetBytes(s.ro, k) + v, err := s.db.GetBytesCF(s.ro, s.cfHandles[table], key) if err != nil { return nil, err } @@ -124,41 +195,39 @@ func (s RocksDBStore) Get(prefix byte, key []byte) (*storage.KVPair, error) { return result, nil } -func (s RocksDBStore) GetRange(prefix byte, start, end []byte) (storage.KVRange, error) { +func (s *RocksDBStore) GetRange(table storage.Table, start, end []byte) (storage.KVRange, error) { result := make(storage.KVRange, 0) - startKey := append([]byte{prefix}, start...) - endKey := append([]byte{prefix}, end...) - it := s.db.NewIterator(s.ro) + it := s.db.NewIteratorCF(s.ro, s.cfHandles[table]) defer it.Close() - for it.Seek(startKey); it.Valid(); it.Next() { + for it.Seek(start); it.Valid(); it.Next() { keySlice := it.Key() key := make([]byte, keySlice.Size()) copy(key, keySlice.Data()) keySlice.Free() - if bytes.Compare(key, endKey) > 0 { + if bytes.Compare(key, end) > 0 { break } valueSlice := it.Value() value := make([]byte, valueSlice.Size()) copy(value, valueSlice.Data()) - result = append(result, storage.KVPair{Key: key[1:], Value: value}) + result = append(result, storage.KVPair{Key: key, Value: value}) valueSlice.Free() } return result, nil } -func (s RocksDBStore) GetLast(prefix byte) (*storage.KVPair, error) { - it := s.db.NewIterator(s.ro) +func (s *RocksDBStore) GetLast(table storage.Table) (*storage.KVPair, error) { + it := s.db.NewIteratorCF(s.ro, s.cfHandles[table]) defer it.Close() - it.SeekForPrev([]byte{prefix, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) - if it.ValidForPrefix([]byte{prefix}) { + it.SeekForPrev([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) + if it.Valid() { result := new(storage.KVPair) keySlice := it.Key() key := make([]byte, keySlice.Size()) copy(key, keySlice.Data()) keySlice.Free() - result.Key = key[1:] + result.Key = key valueSlice := it.Value() value := make([]byte, valueSlice.Size()) copy(value, valueSlice.Data()) @@ -170,20 +239,19 @@ func (s RocksDBStore) GetLast(prefix byte) (*storage.KVPair, error) { } type RocksDBKVPairReader struct { - prefix byte - it *rocksdb.Iterator + it *rocksdb.Iterator } -func NewRocksDBKVPairReader(prefix byte, db *rocksdb.DB) *RocksDBKVPairReader { +func NewRocksDBKVPairReader(cfHandle *rocksdb.ColumnFamilyHandle, db *rocksdb.DB) *RocksDBKVPairReader { opts := rocksdb.NewDefaultReadOptions() opts.SetFillCache(false) - it := db.NewIterator(opts) - it.Seek([]byte{prefix}) - return &RocksDBKVPairReader{prefix, it} + it := db.NewIteratorCF(opts, cfHandle) + it.SeekToFirst() + return &RocksDBKVPairReader{it} } func (r *RocksDBKVPairReader) Read(buffer []*storage.KVPair) (n int, err error) { - for n = 0; r.it.ValidForPrefix([]byte{r.prefix}) && n < len(buffer); r.it.Next() { + for n = 0; r.it.Valid() && n < len(buffer); r.it.Next() { keySlice := r.it.Key() valueSlice := r.it.Value() key := make([]byte, keySlice.Size()) @@ -192,7 +260,7 @@ func (r *RocksDBKVPairReader) Read(buffer []*storage.KVPair) (n int, err error) copy(value, valueSlice.Data()) keySlice.Free() valueSlice.Free() - buffer[n] = &storage.KVPair{Key: key[1:], Value: value} + buffer[n] = &storage.KVPair{Key: key, Value: value} n++ } return n, err @@ -202,18 +270,49 @@ func (r *RocksDBKVPairReader) Close() { r.it.Close() } -func (s RocksDBStore) GetAll(prefix byte) storage.KVPairReader { - return NewRocksDBKVPairReader(prefix, s.db) +func (s *RocksDBStore) GetAll(table storage.Table) storage.KVPairReader { + return NewRocksDBKVPairReader(s.cfHandles[table], s.db) } -func (s RocksDBStore) Close() error { - s.db.Close() - s.ro.Destroy() - s.wo.Destroy() +func (s *RocksDBStore) Close() error { + + for _, cf := range s.cfHandles { + cf.Destroy() + } + + if s.db != nil { + s.db.Close() + } + + if s.stats != nil { + s.stats.Destroy() + } + if s.globalOpts != nil { + s.globalOpts.Destroy() + } + if s.defaultTableOpts != nil { + s.defaultTableOpts.Destroy() + } + if s.indexTableOpts != nil { + s.indexTableOpts.Destroy() + } + if s.hyperCacheTableOpts != nil { + s.hyperCacheTableOpts.Destroy() + } + if s.fsmStateTableOpts != nil { + s.fsmStateTableOpts.Destroy() + } + if s.ro != nil { + s.ro.Destroy() + } + if s.wo != nil { + s.wo.Destroy() + } + return nil } -// Take a snapshot of the store, and returns and id +// Snapshot takes a snapshot of the store, and returns and id // to be used in the back up process. The state of the // snapshot is stored in the store instance. func (s *RocksDBStore) Snapshot() (uint64, error) { @@ -250,28 +349,41 @@ func (s *RocksDBStore) Backup(w io.Writer, id uint64) error { // open a new iterator and dump every key ro := rocksdb.NewDefaultReadOptions() ro.SetFillCache(false) - it := checkDB.NewIterator(ro) - defer it.Close() - - for it.SeekToFirst(); it.Valid(); it.Next() { - keySlice := it.Key() - valueSlice := it.Value() - keyData := keySlice.Data() - valueData := valueSlice.Data() - key := append(keyData[:0:0], keyData...) // See https://github.com/go101/go101/wiki - value := append(valueData[:0:0], valueData...) - keySlice.Free() - valueSlice.Free() - entry := &pb.KVPair{ - Key: key, - Value: value, + tables := []storage.Table{ + storage.DefaultTable, + storage.IndexTable, + storage.HyperCacheTable, + storage.HistoryCacheTable, + storage.FSMStateTable, + } + for _, table := range tables { + + it := checkDB.NewIteratorCF(ro, s.cfHandles[table]) + defer it.Close() + + for it.SeekToFirst(); it.Valid(); it.Next() { + keySlice := it.Key() + valueSlice := it.Value() + keyData := keySlice.Data() + valueData := valueSlice.Data() + key := append(keyData[:0:0], keyData...) // See https://github.com/go101/go101/wiki + value := append(valueData[:0:0], valueData...) + keySlice.Free() + valueSlice.Free() + + entry := &pb.KVPair{ + Table: pb.Table(table), + Key: key, + Value: value, + } + + // write entries to disk + if err := writeTo(entry, w); err != nil { + return err + } } - // write entries to disk - if err := writeTo(entry, w); err != nil { - return err - } } // remove checkpoint from list @@ -318,7 +430,8 @@ func (s *RocksDBStore) Load(r io.Reader) error { if err = kv.Unmarshal(unmarshalBuf[:data]); err != nil { return err } - batch.Put(kv.Key, kv.Value) + table := storage.Table(kv.GetTable()) + batch.PutCF(s.cfHandles[table], kv.Key, kv.Value) if batch.Count() == 1000 { s.db.Write(wo, batch) diff --git a/storage/rocks/rocksdb_store_test.go b/storage/rocks/rocksdb_store_test.go index 6f33e87e4..3146d099f 100644 --- a/storage/rocks/rocksdb_store_test.go +++ b/storage/rocks/rocksdb_store_test.go @@ -36,22 +36,22 @@ import ( func TestMutate(t *testing.T) { store, closeF := openRocksDBStore(t) defer closeF() - prefix := byte(0x0) tests := []struct { testname string + table storage.Table key, value []byte expectedError error }{ - {"Mutate Key=Value", []byte("Key"), []byte("Value"), nil}, + {"Mutate Key=Value", storage.IndexTable, []byte("Key"), []byte("Value"), nil}, } for _, test := range tests { err := store.Mutate([]*storage.Mutation{ - {prefix, test.key, test.value}, + {Table: test.table, Key: test.key, Value: test.value}, }) require.Equalf(t, test.expectedError, err, "Error mutating in test: %s", test.testname) - _, err = store.Get(prefix, test.key) + _, err = store.Get(test.table, test.key) require.Equalf(t, test.expectedError, err, "Error getting key in test: %s", test.testname) } } @@ -61,25 +61,29 @@ func TestGetExistentKey(t *testing.T) { defer closeF() testCases := []struct { - prefix byte + table storage.Table key, value []byte expectedError error }{ - {byte(0x0), []byte("Key1"), []byte("Value1"), nil}, - {byte(0x0), []byte("Key2"), []byte("Value2"), nil}, - {byte(0x1), []byte("Key3"), []byte("Value3"), nil}, - {byte(0x1), []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, + {storage.IndexTable, []byte("Key1"), []byte("Value1"), nil}, + {storage.IndexTable, []byte("Key2"), []byte("Value2"), nil}, + {storage.HyperCacheTable, []byte("Key3"), []byte("Value3"), nil}, + {storage.HyperCacheTable, []byte("Key4"), []byte("Value4"), storage.ErrKeyNotFound}, } for _, test := range testCases { if test.expectedError == nil { err := store.Mutate([]*storage.Mutation{ - {test.prefix, test.key, test.value}, + { + Table: test.table, + Key: test.key, + Value: test.value, + }, }) require.NoError(t, err) } - stored, err := store.Get(test.prefix, test.key) + stored, err := store.Get(test.table, test.key) if test.expectedError == nil { require.NoError(t, err) require.Equalf(t, stored.Key, test.key, "The stored key does not match the original: expected %d, actual %d", test.key, stored.Key) @@ -108,15 +112,15 @@ func TestGetRange(t *testing.T) { {0, 20, 10}, } - prefix := byte(0x0) + table := storage.IndexTable for i := 10; i < 50; i++ { store.Mutate([]*storage.Mutation{ - {prefix, []byte{byte(i)}, []byte("Value")}, + {table, []byte{byte(i)}, []byte("Value")}, }) } for _, test := range testCases { - slice, err := store.GetRange(prefix, []byte{test.start}, []byte{test.end}) + slice, err := store.GetRange(table, []byte{test.start}, []byte{test.end}) require.NoError(t, err) require.Equalf(t, len(slice), test.size, "Slice length invalid: expected %d, actual %d", test.size, len(slice)) } @@ -125,7 +129,7 @@ func TestGetRange(t *testing.T) { func TestGetAll(t *testing.T) { - prefix := storage.HyperCachePrefix + table := storage.HyperCacheTable numElems := uint16(1000) testCases := []struct { batchSize int @@ -144,12 +148,12 @@ func TestGetAll(t *testing.T) { for i := uint16(0); i < numElems; i++ { key := util.Uint16AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix, key, key}, + {table, key, key}, }) } for i, c := range testCases { - reader := store.GetAll(storage.HyperCachePrefix) + reader := store.GetAll(table) numBatches := 0 var lastBatchLen int for { @@ -174,18 +178,18 @@ func TestGetLast(t *testing.T) { // insert numElems := uint64(20) - prefixes := [][]byte{{storage.IndexPrefix}, {storage.HistoryCachePrefix}, {storage.HyperCachePrefix}} - for _, prefix := range prefixes { + tables := []storage.Table{storage.IndexTable, storage.HistoryCacheTable, storage.HyperCacheTable} + for _, table := range tables { for i := uint64(0); i < numElems; i++ { key := util.Uint64AsBytes(i) store.Mutate([]*storage.Mutation{ - {prefix[0], key, key}, + {table, key, key}, }) } } - // get last element for history prefix - kv, err := store.GetLast(storage.HistoryCachePrefix) + // get last element for history table + kv, err := store.GetLast(storage.HistoryCacheTable) require.NoError(t, err) require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Key, "The key should match the last inserted element") require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Value, "The value should match the last inserted element") @@ -198,12 +202,12 @@ func TestBackupLoad(t *testing.T) { // insert numElems := uint64(20) - prefixes := [][]byte{{storage.IndexPrefix}, {storage.HistoryCachePrefix}, {storage.HyperCachePrefix}} - for _, prefix := range prefixes { + tables := []storage.Table{storage.IndexTable, storage.HistoryCacheTable, storage.HyperCacheTable} + for _, table := range tables { for i := uint64(0); i < numElems; i++ { key := util.Uint64AsBytes(i) store.Mutate([]*storage.Mutation{ - {Prefix: prefix[0], Key: key, Value: key}, + {Table: table, Key: key, Value: key}, }) } } @@ -220,8 +224,8 @@ func TestBackupLoad(t *testing.T) { require.NoError(t, restore.Load(ioBuf)) // check elements - for _, prefix := range prefixes { - reader := store.GetAll(prefix[0]) + for _, table := range tables { + reader := store.GetAll(table) for { entries := make([]*storage.KVPair, 1000) n, _ := reader.Read(entries) @@ -229,7 +233,7 @@ func TestBackupLoad(t *testing.T) { break } for i := 0; i < n; i++ { - kv, err := restore.Get(prefix[0], entries[i].Key) + kv, err := restore.Get(table, entries[i].Key) require.NoError(t, err) require.Equal(t, entries[i].Value, kv.Value, "The values should match") } @@ -248,16 +252,15 @@ func BenchmarkMutate(b *testing.B) { http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) go http.ListenAndServe(":2112", nil) - prefix := byte(0x0) b.N = 10000000 b.ResetTimer() for i := 0; i < b.N; i++ { store.Mutate([]*storage.Mutation{ { - Prefix: prefix, - Key: rand.Bytes(128), - Value: []byte("Value"), + Table: storage.IndexTable, + Key: rand.Bytes(128), + Value: []byte("Value"), }, }) } diff --git a/storage/store.go b/storage/store.go index b65609019..0fdf43953 100644 --- a/storage/store.go +++ b/storage/store.go @@ -17,29 +17,78 @@ package storage import ( - "bytes" "errors" "io" - "sort" ) +// Table groups related key-value pairs under a +// consistent space. +type Table uint32 + const ( - IndexPrefix = byte(0x0) - HyperCachePrefix = byte(0x1) - HistoryCachePrefix = byte(0x2) - FSMStatePrefix = byte(0x3) + // DefaultTable is mandatory but not used. + DefaultTable Table = iota + // IndexTable contains maps between event hashes and versions. + // H(event) -> version + IndexTable + // HyperCacheTable contains cached batches of the hyper tree. + // Position -> Batch + HyperCacheTable + // HistoryCacheTable contains cached hashes of the history tree. + // Position -> Hash + HistoryCacheTable + // FSMStateTable contains the current state of the FSM (index, term, version...). + // key -> state + FSMStateTable ) +// String returns a string representation of the table. +func (t Table) String() string { + var s string + switch t { + case DefaultTable: + s = "default" + case IndexTable: + s = "index" + case HyperCacheTable: + s = "hyper" + case HistoryCacheTable: + s = "history" + case FSMStateTable: + s = "fsm" + } + return s +} + +// Prefix returns the byte prefix associated with this table. +// This method exists for backward compatibility purposes. +func (t Table) Prefix() byte { + var prefix byte + switch t { + case IndexTable: + prefix = byte(0x0) + case HyperCacheTable: + prefix = byte(0x1) + case HistoryCacheTable: + prefix = byte(0x2) + case FSMStateTable: + prefix = byte(0x3) + default: + prefix = byte(0x4) + } + return prefix +} + var ( ErrKeyNotFound = errors.New("key not found") ) type Store interface { Mutate(mutations []*Mutation) error - GetRange(prefix byte, start, end []byte) (KVRange, error) - Get(prefix byte, key []byte) (*KVPair, error) - GetAll(prefix byte) KVPairReader - GetLast(prefix byte) (*KVPair, error) + GetRange(table Table, start, end []byte) (KVRange, error) + Get(table Table, key []byte) (*KVPair, error) + GetAll(table Table) KVPairReader + GetLast(table Table) (*KVPair, error) Close() error } @@ -51,12 +100,16 @@ type ManagedStore interface { } type Mutation struct { - Prefix byte + Table Table Key, Value []byte } -func NewMutation(prefix byte, key, value []byte) *Mutation { - return &Mutation{prefix, key, value} +func NewMutation(table Table, key, value []byte) *Mutation { + return &Mutation{ + Table: table, + Key: key, + Value: value, + } } type KVPair struct { @@ -77,43 +130,3 @@ type KVRange []KVPair func NewKVRange() KVRange { return make(KVRange, 0) } - -func (r KVRange) InsertSorted(p KVPair) KVRange { - - if len(r) == 0 { - r = append(r, p) - return r - } - - index := sort.Search(len(r), func(i int) bool { - return bytes.Compare(r[i].Key, p.Key) > 0 - }) - - if index > 0 && bytes.Equal(r[index-1].Key, p.Key) { - return r - } - - r = append(r, p) - copy(r[index+1:], r[index:]) - r[index] = p - return r -} - -func (r KVRange) Split(key []byte) (left, right KVRange) { - // the smallest index i where r[i] >= index - index := sort.Search(len(r), func(i int) bool { - return bytes.Compare(r[i].Key, key) >= 0 - }) - return r[:index], r[index:] -} - -func (r KVRange) Get(key []byte) KVPair { - index := sort.Search(len(r), func(i int) bool { - return bytes.Compare(r[i].Key, key) >= 0 - }) - if index < len(r) && bytes.Equal(r[index].Key, key) { - return r[index] - } else { - panic("This should never happen") - } -} diff --git a/storage/store_test.go b/storage/store_test.go deleted file mode 100644 index d06e6b11d..000000000 --- a/storage/store_test.go +++ /dev/null @@ -1,195 +0,0 @@ -/* - Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package storage - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestInsertSorted(t *testing.T) { - - tests := []struct { - originalRange KVRange - item KVPair - expectedRange KVRange - }{ - - { - originalRange: KVRange{}, - item: NewKVPair([]byte{0x01}, []byte{0x01}), - expectedRange: KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - }, - }, - - { - originalRange: KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - item: NewKVPair([]byte{0x00}, []byte{0x00}), - expectedRange: KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - }, - - { - originalRange: KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - item: NewKVPair([]byte{0x01}, []byte{0x01}), - expectedRange: KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - }, - - { - originalRange: KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - item: NewKVPair([]byte{0x08}, []byte{0x08}), - expectedRange: KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - }, - - { - originalRange: KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - item: NewKVPair([]byte{0x04}, []byte{0x04}), - expectedRange: KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x04}, []byte{0x04}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - }, - - { - originalRange: KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x04}, []byte{0x04}), - NewKVPair([]byte{0x08}, []byte{0x08}), - }, - item: NewKVPair([]byte{0x09}, []byte{0x09}), - expectedRange: KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x04}, []byte{0x04}), - NewKVPair([]byte{0x08}, []byte{0x08}), - NewKVPair([]byte{0x09}, []byte{0x09}), - }, - }, - } - - for i, test := range tests { - kvrange := test.originalRange.InsertSorted(test.item) - require.Equalf(t, test.expectedRange, kvrange, "Error sorting in test: %d, value: %x", i, test.item) - } - -} - -func TestSplit(t *testing.T) { - - kvrange := KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x02}, []byte{0x02}), - } - - testCases := []struct { - key []byte - expectedLeft, expectedRight KVRange - }{ - { - []byte{0x00}, - KVRange{}, - KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x02}, []byte{0x02}), - }, - }, - { - []byte{0x01}, - KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - }, - KVRange{ - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x02}, []byte{0x02}), - }, - }, - { - []byte{0x02}, - KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - }, - KVRange{ - NewKVPair([]byte{0x02}, []byte{0x02}), - }, - }, - { - []byte{0x03}, - KVRange{ - NewKVPair([]byte{0x00}, []byte{0x00}), - NewKVPair([]byte{0x01}, []byte{0x01}), - NewKVPair([]byte{0x02}, []byte{0x02}), - }, - KVRange{}, - }, - } - - for i, c := range testCases { - left, right := kvrange.Split(c.key) - require.Equal(t, c.expectedLeft, left, "Error spliting test: %d, value: %x", i, c.key) - require.Equal(t, c.expectedRight, right, "Error spliting test: %d, value: %x", i, c.key) - } -} - -func TestGet(t *testing.T) { - - kvrange := NewKVRange() - for i := 0; i < 10; i++ { - kvpair := NewKVPair([]byte{byte(i)}, []byte{byte(i)}) - kvrange = kvrange.InsertSorted(kvpair) - } - - tests := []struct { - key, expectedValue []byte - }{ - {[]byte{0x4}, []byte{0x4}}, - } - - for i, test := range tests { - kvpair := kvrange.Get(test.key) - require.Equalf(t, test.expectedValue, kvpair.Value, "Get error in test: %d, value: %x", i, test.key) - } -} From 6e313945507add5d5985e2efc181793fda41e612 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Wed, 27 Mar 2019 16:47:21 +0100 Subject: [PATCH 2/3] Tune some rocksdb options per column family --- raftwal/fsm.go | 4 +- rocksdb/options.go | 65 +++++++++ storage/rocks/rocksdb_store.go | 121 +++++++++------- storage/rocks/rocksdb_store_bench_test.go | 162 ++++++++++++++++++++++ storage/rocks/rocksdb_store_test.go | 28 ---- storage/store.go | 3 + 6 files changed, 304 insertions(+), 79 deletions(-) create mode 100644 storage/rocks/rocksdb_store_bench_test.go diff --git a/raftwal/fsm.go b/raftwal/fsm.go index 8d80e5517..a68ad8e8b 100644 --- a/raftwal/fsm.go +++ b/raftwal/fsm.go @@ -61,7 +61,7 @@ type BalloonFSM struct { func loadState(s storage.ManagedStore) (*fsmState, error) { var state fsmState - kvstate, err := s.Get(storage.FSMStateTable, []byte{0xab}) + kvstate, err := s.Get(storage.FSMStateTable, storage.FSMStateTableKey) if err == storage.ErrKeyNotFound { log.Infof("Unable to find previous state: assuming a clean instance") return &fsmState{0, 0, 0}, nil @@ -255,7 +255,7 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse { return &fsmAddResponse{error: err} } - mutations = append(mutations, storage.NewMutation(storage.FSMStateTable, []byte{0xab}, stateBuff.Bytes())) + mutations = append(mutations, storage.NewMutation(storage.FSMStateTable, storage.FSMStateTableKey, stateBuff.Bytes())) err = fsm.store.Mutate(mutations) if err != nil { return &fsmAddResponse{error: err} diff --git a/rocksdb/options.go b/rocksdb/options.go index 56b9448f6..4a23fe9df 100644 --- a/rocksdb/options.go +++ b/rocksdb/options.go @@ -89,6 +89,34 @@ func (o *Options) SetMinWriteBufferNumberToMerge(value int) { C.rocksdb_options_set_min_write_buffer_number_to_merge(o.c, C.int(value)) } +// SetMaxOpenFiles sets the number of open files that can be used by the DB. +// +// You may need to increase this if your database has a large working set +// (budget one open file per 2MB of working set). +// Default: 1000 +func (o *Options) SetMaxOpenFiles(value int) { + C.rocksdb_options_set_max_open_files(o.c, C.int(value)) +} + +// SetMaxFileOpeningThreads sets the maximum number of file opening threads. +// If max_open_files is -1, DB will open all files on db.Open(). You can +// use this option to increase the number of threads used to open the files. +// Default: 16 +func (o *Options) SetMaxFileOpeningThreads(value int) { + C.rocksdb_options_set_max_file_opening_threads(o.c, C.int(value)) +} + +// SetMaxTotalWalSize sets the maximum total wal size in bytes. +// Once write-ahead logs exceed this size, we will start forcing the flush of +// column families whose memtables are backed by the oldest live WAL file +// (i.e. the ones that are causing all the space amplification). If set to 0 +// (default), we will dynamically choose the WAL size limit to be +// [sum of all write_buffer_size * max_write_buffer_number] * 4 +// Default: 0 +func (o *Options) SetMaxTotalWalSize(value uint64) { + C.rocksdb_options_set_max_total_wal_size(o.c, C.uint64_t(value)) +} + // SetBlockBasedTableFactory sets the block based table factory. func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) { o.bbto = value @@ -316,6 +344,43 @@ func (o *Options) SetMaxBackgroundFlushes(value int) { C.rocksdb_options_set_max_background_flushes(o.c, C.int(value)) } +// SetMaxLogFileSize sets the maximal size of the info log file. +// +// If the log file is larger than `max_log_file_size`, a new info log +// file will be created. +// If max_log_file_size == 0, all logs will be written to one log file. +// Default: 0 +func (o *Options) SetMaxLogFileSize(value int) { + C.rocksdb_options_set_max_log_file_size(o.c, C.size_t(value)) +} + +// SetLogFileTimeToRoll sets the time for the info log file to roll (in seconds). +// +// If specified with non-zero value, log file will be rolled +// if it has been active longer than `log_file_time_to_roll`. +// Default: 0 (disabled) +func (o *Options) SetLogFileTimeToRoll(value int) { + C.rocksdb_options_set_log_file_time_to_roll(o.c, C.size_t(value)) +} + +// SetKeepLogFileNum sets the maximal info log files to be kept. +// Default: 1000 +func (o *Options) SetKeepLogFileNum(value int) { + C.rocksdb_options_set_keep_log_file_num(o.c, C.size_t(value)) +} + +// SetAllowMmapReads enable/disable mmap reads for reading sst tables. +// Default: false +func (o *Options) SetAllowMmapReads(value bool) { + C.rocksdb_options_set_allow_mmap_reads(o.c, boolToUchar(value)) +} + +// SetAllowMmapWrites enable/disable mmap writes for writing sst tables. +// Default: false +func (o *Options) SetAllowMmapWrites(value bool) { + C.rocksdb_options_set_allow_mmap_writes(o.c, boolToUchar(value)) +} + // SetStatistics sets a statistics object to pass to the DB. func (o *Options) SetStatistics(s *Statistics) { C.rocksdb_options_set_statistics(o.c, s.c) diff --git a/storage/rocks/rocksdb_store.go b/storage/rocks/rocksdb_store.go index 3d07d1fde..4eb6cc867 100644 --- a/storage/rocks/rocksdb_store.go +++ b/storage/rocks/rocksdb_store.go @@ -46,12 +46,9 @@ type RocksDBStore struct { cfHandles rocksdb.ColumnFamilyHandles // global options - globalOpts *rocksdb.Options - defaultTableOpts *rocksdb.Options - indexTableOpts *rocksdb.Options - hyperCacheTableOpts *rocksdb.Options - historyCacheTableOpts *rocksdb.Options - fsmStateTableOpts *rocksdb.Options + globalOpts *rocksdb.Options + // per column family options + cfOpts []*rocksdb.Options // read/write options ro *rocksdb.ReadOptions @@ -81,34 +78,22 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { globalOpts := rocksdb.NewDefaultOptions() globalOpts.SetCreateIfMissing(true) globalOpts.SetCreateIfMissingColumnFamilies(true) + globalOpts.IncreaseParallelism(4) var stats *rocksdb.Statistics if opts.EnableStatistics { stats = rocksdb.NewStatistics() globalOpts.SetStatistics(stats) } - defaultTableOpts := rocksdb.NewDefaultOptions() - indexTableOpts := getIndexTableOpts() - hyperCacheTableOpts := getHyperCacheTableOpts() - historyCacheTableOpts := getHistoryCacheTableOpts() - fsmStateTableOpts := getFsmStateTableOpts() - + // Per column family options cfOpts := []*rocksdb.Options{ - defaultTableOpts, - indexTableOpts, - hyperCacheTableOpts, - historyCacheTableOpts, - fsmStateTableOpts, + rocksdb.NewDefaultOptions(), + getIndexTableOpts(), + getHyperCacheTableOpts(), + getHistoryCacheTableOpts(), + getFsmStateTableOpts(), } - // rocksdbOpts.IncreaseParallelism(4) - // rocksdbOpts.SetMaxWriteBufferNumber(5) - // rocksdbOpts.SetMinWriteBufferNumberToMerge(2) - - // blockOpts := rocksdb.NewDefaultBlockBasedTableOptions() - // blockOpts.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10)) - // rocksdbOpts.SetBlockBasedTableFactory(blockOpts) - db, cfHandles, err := rocksdb.OpenDBColumnFamilies(opts.Path, globalOpts, cfNames, cfOpts) if err != nil { return nil, err @@ -121,19 +106,15 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { } store := &RocksDBStore{ - db: db, - stats: stats, - cfHandles: cfHandles, - checkPointPath: checkPointPath, - checkpoints: make(map[uint64]string), - globalOpts: globalOpts, - defaultTableOpts: defaultTableOpts, - indexTableOpts: indexTableOpts, - hyperCacheTableOpts: hyperCacheTableOpts, - historyCacheTableOpts: historyCacheTableOpts, - fsmStateTableOpts: fsmStateTableOpts, - wo: rocksdb.NewDefaultWriteOptions(), - ro: rocksdb.NewDefaultReadOptions(), + db: db, + stats: stats, + cfHandles: cfHandles, + checkPointPath: checkPointPath, + checkpoints: make(map[uint64]string), + globalOpts: globalOpts, + cfOpts: cfOpts, + wo: rocksdb.NewDefaultWriteOptions(), + ro: rocksdb.NewDefaultReadOptions(), } if rms == nil && stats != nil { @@ -144,30 +125,79 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { } func getIndexTableOpts() *rocksdb.Options { + // index table is append-only so we have to optimize for + // read amplification + + // TODO change this!!! + bbto := rocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10)) opts := rocksdb.NewDefaultOptions() opts.SetBlockBasedTableFactory(bbto) + opts.SetCompression(rocksdb.SnappyCompression) + // in normal mode, by default, we try to minimize space amplification, + // so we set: + // + // L0 size = 64MBytes * 2 (min_write_buffer_number_to_merge) * \ + // 8 (level0_file_num_compaction_trigger) + // = 1GBytes + // L1 size close to L0, 1GBytes, max_bytes_for_level_base = 1GBytes, + // max_bytes_for_level_multiplier = 2 + // L2 size is 2G, L3 is 4G, L4 is 8G, L5 16G... + // + opts.SetWriteBufferSize(64 * 1024 * 1024) + opts.SetMaxWriteBufferNumber(5) + opts.SetMinWriteBufferNumberToMerge(2) + opts.SetLevel0FileNumCompactionTrigger(8) + // MaxBytesForLevelBase is the total size of L1, should be close to + // the size of L0 + opts.SetMaxBytesForLevelBase(1 * 1024 * 1024 * 1024) + opts.SetMaxBytesForLevelMultiplier(2) + // files in L1 will have TargetFileSizeBase bytes + opts.SetTargetFileSizeBase(64 * 1024 * 1024) + opts.SetTargetFileSizeMultiplier(10) + // io parallelism + opts.SetMaxBackgroundCompactions(2) + opts.SetMaxBackgroundFlushes(2) return opts } func getHyperCacheTableOpts() *rocksdb.Options { bbto := rocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10)) opts := rocksdb.NewDefaultOptions() opts.SetBlockBasedTableFactory(bbto) + opts.SetCompression(rocksdb.SnappyCompression) return opts } func getHistoryCacheTableOpts() *rocksdb.Options { bbto := rocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetFilterPolicy(rocksdb.NewBloomFilterPolicy(10)) opts := rocksdb.NewDefaultOptions() opts.SetBlockBasedTableFactory(bbto) + opts.SetCompression(rocksdb.SnappyCompression) return opts } func getFsmStateTableOpts() *rocksdb.Options { + // FSM state contains only one key that is updated on every + // add event operation. We should try to reduce write and + // space amplification by keeping a lower number of levels. bbto := rocksdb.NewDefaultBlockBasedTableOptions() opts := rocksdb.NewDefaultOptions() opts.SetBlockBasedTableFactory(bbto) + opts.SetCompression(rocksdb.SnappyCompression) + // we try to reduce write and space amplification, so we: + // * set a low size for the in-memory write buffers + // * reduce the number of write buffers + // * activate merging before flushing + // * set parallelism to 1 + opts.SetWriteBufferSize(4 * 1024 * 1024) + opts.SetMaxWriteBufferNumber(3) + opts.SetMinWriteBufferNumberToMerge(2) + opts.SetMaxBackgroundCompactions(1) + opts.SetMaxBackgroundFlushes(1) return opts } @@ -290,18 +320,11 @@ func (s *RocksDBStore) Close() error { if s.globalOpts != nil { s.globalOpts.Destroy() } - if s.defaultTableOpts != nil { - s.defaultTableOpts.Destroy() - } - if s.indexTableOpts != nil { - s.indexTableOpts.Destroy() - } - if s.hyperCacheTableOpts != nil { - s.hyperCacheTableOpts.Destroy() - } - if s.fsmStateTableOpts != nil { - s.fsmStateTableOpts.Destroy() + + for _, opt := range s.cfOpts { + opt.Destroy() } + if s.ro != nil { s.ro.Destroy() } diff --git a/storage/rocks/rocksdb_store_bench_test.go b/storage/rocks/rocksdb_store_bench_test.go new file mode 100644 index 000000000..ef2c4ce29 --- /dev/null +++ b/storage/rocks/rocksdb_store_bench_test.go @@ -0,0 +1,162 @@ +/* + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocks + +import ( + "net/http" + "testing" + + "github.com/bbva/qed/storage" + "github.com/bbva/qed/testutils/rand" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func BenchmarkMutateOnlyIndex(b *testing.B) { + store, closeF := openRocksDBStore(b) + defer closeF() + + reg := prometheus.NewRegistry() + reg.MustRegister(PrometheusCollectors()...) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + go http.ListenAndServe(":2112", nil) + + b.N = 10000000 + b.ResetTimer() + + for i := 0; i < b.N; i++ { + store.Mutate([]*storage.Mutation{ + { + Table: storage.IndexTable, + Key: rand.Bytes(32), + Value: rand.Bytes(8), + }, + }) + } + +} + +func BenchmarkMutateOnlyHyper(b *testing.B) { + store, closeF := openRocksDBStore(b) + defer closeF() + + reg := prometheus.NewRegistry() + reg.MustRegister(PrometheusCollectors()...) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + go http.ListenAndServe(":2112", nil) + + b.N = 10000000 + b.ResetTimer() + + for i := 0; i < b.N; i++ { + store.Mutate([]*storage.Mutation{ + { + Table: storage.HyperCacheTable, + Key: rand.Bytes(34), + Value: rand.Bytes(1024), + }, + }) + } + +} + +func BenchmarkMutateOnlyHistory(b *testing.B) { + store, closeF := openRocksDBStore(b) + defer closeF() + + reg := prometheus.NewRegistry() + reg.MustRegister(PrometheusCollectors()...) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + go http.ListenAndServe(":2112", nil) + + b.N = 10000000 + b.ResetTimer() + + for i := 0; i < b.N; i++ { + store.Mutate([]*storage.Mutation{ + { + Table: storage.HistoryCacheTable, + Key: rand.Bytes(34), + Value: rand.Bytes(32), + }, + }) + } + +} + +func BenchmarkMutateOnlyFSMState(b *testing.B) { + store, closeF := openRocksDBStore(b) + defer closeF() + + reg := prometheus.NewRegistry() + reg.MustRegister(PrometheusCollectors()...) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + go http.ListenAndServe(":2112", nil) + + b.N = 1000000 + b.ResetTimer() + + for i := 0; i < b.N; i++ { + store.Mutate([]*storage.Mutation{ + { + Table: storage.FSMStateTable, + Key: storage.FSMStateTableKey, + Value: rand.Bytes(128), + }, + }) + } + +} + +func BenchmarkMutateAllTables(b *testing.B) { + store, closeF := openRocksDBStore(b) + defer closeF() + + reg := prometheus.NewRegistry() + reg.MustRegister(PrometheusCollectors()...) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + go http.ListenAndServe(":2112", nil) + + b.N = 10000000 + b.ResetTimer() + + for i := 0; i < b.N; i++ { + store.Mutate([]*storage.Mutation{ + { + Table: storage.IndexTable, + Key: rand.Bytes(128), + Value: []byte("Value"), + }, + { + Table: storage.HyperCacheTable, + Key: rand.Bytes(128), + Value: []byte("Value"), + }, + { + Table: storage.HistoryCacheTable, + Key: rand.Bytes(128), + Value: []byte("Value"), + }, + { + Table: storage.FSMStateTable, + Key: storage.FSMStateTableKey, + Value: rand.Bytes(128), + }, + }) + } + +} diff --git a/storage/rocks/rocksdb_store_test.go b/storage/rocks/rocksdb_store_test.go index 3146d099f..0eb9c5182 100644 --- a/storage/rocks/rocksdb_store_test.go +++ b/storage/rocks/rocksdb_store_test.go @@ -19,16 +19,12 @@ import ( "bytes" "fmt" "io/ioutil" - "net/http" "os" "path/filepath" "testing" "github.com/bbva/qed/storage" - "github.com/bbva/qed/testutils/rand" "github.com/bbva/qed/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -243,30 +239,6 @@ func TestBackupLoad(t *testing.T) { } -func BenchmarkMutate(b *testing.B) { - store, closeF := openRocksDBStore(b) - defer closeF() - - reg := prometheus.NewRegistry() - reg.MustRegister(PrometheusCollectors()...) - http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) - go http.ListenAndServe(":2112", nil) - - b.N = 10000000 - b.ResetTimer() - - for i := 0; i < b.N; i++ { - store.Mutate([]*storage.Mutation{ - { - Table: storage.IndexTable, - Key: rand.Bytes(128), - Value: []byte("Value"), - }, - }) - } - -} - func openRocksDBStore(t require.TestingT) (*RocksDBStore, func()) { path := mustTempDir() store, err := NewRocksDBStore(filepath.Join(path, "rockdsdb_store_test.db")) diff --git a/storage/store.go b/storage/store.go index 0fdf43953..19860224e 100644 --- a/storage/store.go +++ b/storage/store.go @@ -42,6 +42,9 @@ const ( FSMStateTable ) +// FSMStateTableKey single key to persist fsm state. +var FSMStateTableKey = []byte{0xab} + // String returns a string representation of the table. func (t Table) String() string { var s string From d4a05f8497a018a596ca070b610ef6231c0a7303 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Wed, 27 Mar 2019 16:52:05 +0100 Subject: [PATCH 3/3] Remove repeated method --- rocksdb/options.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/rocksdb/options.go b/rocksdb/options.go index 4a23fe9df..49d2f4b10 100644 --- a/rocksdb/options.go +++ b/rocksdb/options.go @@ -106,17 +106,6 @@ func (o *Options) SetMaxFileOpeningThreads(value int) { C.rocksdb_options_set_max_file_opening_threads(o.c, C.int(value)) } -// SetMaxTotalWalSize sets the maximum total wal size in bytes. -// Once write-ahead logs exceed this size, we will start forcing the flush of -// column families whose memtables are backed by the oldest live WAL file -// (i.e. the ones that are causing all the space amplification). If set to 0 -// (default), we will dynamically choose the WAL size limit to be -// [sum of all write_buffer_size * max_write_buffer_number] * 4 -// Default: 0 -func (o *Options) SetMaxTotalWalSize(value uint64) { - C.rocksdb_options_set_max_total_wal_size(o.c, C.uint64_t(value)) -} - // SetBlockBasedTableFactory sets the block based table factory. func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) { o.bbto = value