Skip to content

Commit

Permalink
Merge "remove existing lscc state cache"
Browse files Browse the repository at this point in the history
  • Loading branch information
manish-sethi authored and Gerrit Code Review committed Nov 11, 2019
2 parents a90f67d + ef5ff5d commit 31534f0
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 139 deletions.
70 changes: 1 addition & 69 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
var logger = flogging.MustGetLogger("statecouchdb")

const (
// lsccCacheSize denotes the number of entries allowed in the lsccStateCache
lsccCacheSize = 50
// savepointDocID is used as a key for maintaining savepoint (maintained in metadatadb for a channel)
savepointDocID = "statedb_savepoint"
// fabricInternalDBName is used to create a db in couch that would be used for internal data such as the version of the data format
Expand Down Expand Up @@ -171,59 +169,9 @@ type VersionedDB struct {
committedDataCache *versionsCache // Used as a local cache during bulk processing of a block.
verCacheLock sync.RWMutex
mux sync.RWMutex
lsccStateCache *lsccStateCache
redoLogger *redoLogger
}

type lsccStateCache struct {
cache map[string]*statedb.VersionedValue
rwMutex sync.RWMutex
}

func (l *lsccStateCache) getState(key string) *statedb.VersionedValue {
l.rwMutex.RLock()
defer l.rwMutex.RUnlock()

if versionedValue, ok := l.cache[key]; ok {
logger.Debugf("key:[%s] found in the lsccStateCache", key)
return versionedValue
}
return nil
}

func (l *lsccStateCache) updateState(key string, value *statedb.VersionedValue) {
l.rwMutex.Lock()
defer l.rwMutex.Unlock()

if _, ok := l.cache[key]; ok {
logger.Debugf("key:[%s] is updated in lsccStateCache", key)
l.cache[key] = value
}
}

func (l *lsccStateCache) setState(key string, value *statedb.VersionedValue) {
l.rwMutex.Lock()
defer l.rwMutex.Unlock()

if l.isCacheFull() {
l.evictARandomEntry()
}

logger.Debugf("key:[%s] is stored in lsccStateCache", key)
l.cache[key] = value
}

func (l *lsccStateCache) isCacheFull() bool {
return len(l.cache) == lsccCacheSize
}

func (l *lsccStateCache) evictARandomEntry() {
for key := range l.cache {
delete(l.cache, key)
return
}
}

// newVersionedDB constructs an instance of VersionedDB
func newVersionedDB(couchInstance *couchdb.CouchInstance, redoLogger *redoLogger, dbName string) (*VersionedDB, error) {
// CreateCouchDatabase creates a CouchDB database object, as well as the underlying database if it does not exist
Expand All @@ -241,10 +189,7 @@ func newVersionedDB(couchInstance *couchdb.CouchInstance, redoLogger *redoLogger
chainName: chainName,
namespaceDBs: namespaceDBMap,
committedDataCache: newVersionCache(),
lsccStateCache: &lsccStateCache{
cache: make(map[string]*statedb.VersionedValue),
},
redoLogger: redoLogger,
redoLogger: redoLogger,
}
logger.Debugf("chain [%s]: checking for redolog record", chainName)
redologRecord, err := redoLogger.load()
Expand Down Expand Up @@ -398,11 +343,6 @@ func (vdb *VersionedDB) BytesKeySupported() bool {
// GetState implements method in VersionedDB interface
func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
if namespace == "lscc" {
if value := vdb.lsccStateCache.getState(key); value != nil {
return value, nil
}
}

db, err := vdb.getNamespaceDBHandle(namespace)
if err != nil {
Expand All @@ -420,10 +360,6 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version
return nil, err
}

if namespace == "lscc" {
vdb.lsccStateCache.setState(key, kv.VersionedValue)
}

return kv.VersionedValue, nil
}

Expand Down Expand Up @@ -668,10 +604,6 @@ func (vdb *VersionedDB) applyUpdates(updates *statedb.UpdateBatch, height *versi
return err
}

lsccUpdates := updates.GetUpdates("lscc")
for key, value := range lsccUpdates {
vdb.lsccStateCache.updateState(key, value)
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -660,75 +659,6 @@ func TestPaginatedQueryValidation(t *testing.T) {
assert.Error(t, err, "An should have been thrown for an invalid options")
}

func TestLSCCStateCache(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()

db, err := env.DBProvider.GetDBHandle("testinit")
assert.NoError(t, err)
db.Open()
defer db.Close()

// Scenario 1: Storing two keys in the lscc name space.
// Checking whether the cache is populated correctly during
// GetState()
batch := statedb.NewUpdateBatch()
batch.Put("lscc", "key1", []byte("value1"), version.NewHeight(1, 1))
batch.Put("lscc", "key2", []byte("value2"), version.NewHeight(1, 1))

savePoint := version.NewHeight(1, 1)
db.ApplyUpdates(batch, savePoint)

// cache should not contain key1 and key2
assert.Nil(t, db.(*VersionedDB).lsccStateCache.getState("key1"))
assert.Nil(t, db.(*VersionedDB).lsccStateCache.getState("key2"))

// GetState() populates the cache
valueFromDB, err := db.GetState("lscc", "key1")
assert.NoError(t, err)
valueFromCache := db.(*VersionedDB).lsccStateCache.getState("key1")
assert.Equal(t, valueFromCache, valueFromDB)

// Scenario 2: updates an existing key in lscc namespace. Note that the
// key in lsccStateCache should be updated
batch = statedb.NewUpdateBatch()
batch.Put("lscc", "key1", []byte("new-value1"), version.NewHeight(1, 2))
savePoint = version.NewHeight(1, 2)
db.ApplyUpdates(batch, savePoint)

valueFromCache = db.(*VersionedDB).lsccStateCache.getState("key1")
expectedValue := &statedb.VersionedValue{Value: []byte("new-value1"), Version: version.NewHeight(1, 2)}
assert.Equal(t, expectedValue, valueFromCache)

// Scenario 3: adds LsccCacheSize number of keys in lscc namespace.
// Read all keys in lscc namespace to make the cache full. This is to
// test the eviction.
batch = statedb.NewUpdateBatch()
for i := 0; i < lsccCacheSize; i++ {
batch.Put("lscc", "key"+strconv.Itoa(i), []byte("value"+strconv.Itoa(i)), version.NewHeight(1, 3))
}
savePoint = version.NewHeight(1, 3)
db.ApplyUpdates(batch, savePoint)

for i := 0; i < lsccCacheSize; i++ {
_, err := db.GetState("lscc", "key"+strconv.Itoa(i))
assert.NoError(t, err)
}
assert.Equal(t, true, db.(*VersionedDB).lsccStateCache.isCacheFull())

batch = statedb.NewUpdateBatch()
batch.Put("lscc", "key50", []byte("value1"), version.NewHeight(1, 4))
savePoint = version.NewHeight(1, 4)
db.ApplyUpdates(batch, savePoint)

// GetState() populates the cache after a eviction
valueFromDB, err = db.GetState("lscc", "key50")
assert.NoError(t, err)
valueFromCache = db.(*VersionedDB).lsccStateCache.getState("key50")
assert.Equal(t, valueFromCache, valueFromDB)
assert.Equal(t, true, db.(*VersionedDB).lsccStateCache.isCacheFull())
}

func TestApplyUpdatesWithNilHeight(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()
Expand Down

0 comments on commit 31534f0

Please sign in to comment.