From a758b1d390ed8d8cf52ccd28de4cf7f3fb2b4767 Mon Sep 17 00:00:00 2001 From: Marty Schoch Date: Thu, 1 Oct 2015 17:44:22 -0400 Subject: [PATCH] introduce new KVPool to aid in concurrent usage The KVPool addresses the use case where an application wants to support N concurrent actors on a single ForestDB KVStore at one time. ForestDB requries that these actors operate on separate fdb handles and this KVPool attempts to make this pattern simpler. Users invoke NewKVPool, with parameters used to create the *File and child *KVStore objects as well as the number N of objects to pool. Only references to the *KVStore are kept in the pool, access to the parent *File is done through the KVStore.File() method. Once you have a KVPool, clients get a KVStore by using the Get() method. The MUST return it using the Return() method when done. The client MUST call Close() in order to free all resources. The call to Close() may block forever in the event that KVStores acquired through Get() were not Return()'d. Change-Id: Icacc4f3fd0daf7957bb82c5a6f4b1facf3461acc Reviewed-on: http://review.couchbase.org/55706 Reviewed-by: Steve Yen Tested-by: Marty Schoch --- forestdb.go | 5 +++ forestdb_test.go | 63 +++++++++++++++++++++++++++++++++++ pool.go | 85 ++++++++++++++++++++++++++++++++++++++++++++++++ pool_test.go | 49 ++++++++++++++++++++++++++++ 4 files changed, 202 insertions(+) create mode 100644 pool.go create mode 100644 pool_test.go diff --git a/forestdb.go b/forestdb.go index 25c1cad..b4de182 100644 --- a/forestdb.go +++ b/forestdb.go @@ -20,6 +20,11 @@ type KVStore struct { db *C.fdb_kvs_handle } +// File returns the File containing this KVStore +func (k *KVStore) File() *File { + return k.f +} + // Close the KVStore and release related resources. func (k *KVStore) Close() error { Log.Tracef("fdb_kvs_close call k:%p db:%v", k, k.db) diff --git a/forestdb_test.go b/forestdb_test.go index 71810d5..4b986fe 100644 --- a/forestdb_test.go +++ b/forestdb_test.go @@ -9,8 +9,10 @@ package forestdb import ( + "encoding/binary" "fmt" "os" + "sync" "testing" ) @@ -267,3 +269,64 @@ func TestForestDBCompactUpto(t *testing.T) { t.Errorf("expected commit marker seqnum 10, got %v", cm[0].GetSeqNum()) } } + +func TestForestDBConcurrent(t *testing.T) { + numWriters := 2 + numReaders := 4 + numOps := 100000 + testValue := []byte{0, 0, 0, 0, 0, 0, 0, 0} + defer os.RemoveAll("test") + + fdbConfig := DefaultConfig() + kvConfig := DefaultKVStoreConfig() + + // create a pool, each worker its own file/kvstore + kvpool, err := NewKVPool("test", fdbConfig, "default", kvConfig, numReaders+numWriters) + if err != nil { + t.Fatal(err) + } + defer kvpool.Close() + + var wg sync.WaitGroup + // start writers + for i := 0; i < numWriters; i++ { + kvs, err := kvpool.Get() + if err != nil { + t.Fatal(err) + } + db := kvs.File() + wg.Add(1) + go func(base int, db *File, kvs *KVStore) { + defer wg.Done() + defer kvpool.Return(kvs) + for n := 0; n < numOps; n++ { + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, uint32(base*numOps+n)) + if err := kvs.SetKV(key, testValue); err != nil { + t.Fatalf("writer err: %v", err) + } + } + }(i, db, kvs) + } + // start readers + for i := 0; i < numReaders; i++ { + kvs, err := kvpool.Get() + if err != nil { + t.Fatal(err) + } + db := kvs.File() + wg.Add(1) + go func(base int, db *File, kvs *KVStore) { + defer wg.Done() + defer kvpool.Return(kvs) + for n := 0; n < numOps; n++ { + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, uint32(base*numOps+n)) + if _, err := kvs.GetKV(key); err != nil && err != RESULT_KEY_NOT_FOUND { + t.Fatalf("reader err: %v", err) + } + } + }(i, db, kvs) + } + wg.Wait() +} diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..099658a --- /dev/null +++ b/pool.go @@ -0,0 +1,85 @@ +package forestdb + +import ( + "fmt" + "sync" +) + +// KVPool is a structure representing a pool of KVStores +// inside a file. Each has been opened with it's own +// File handle, so they can be used concurrently safely. +type KVPool struct { + closedMutex sync.RWMutex + closed bool + stores chan *KVStore +} + +var PoolClosed = fmt.Errorf("pool already closed") + +func NewKVPool(filename string, config *Config, kvstore string, kvconfig *KVStoreConfig, size int) (*KVPool, error) { + rv := KVPool{} + rv.stores = make(chan *KVStore, size) + for i := 0; i < size; i++ { + db, err := Open(filename, config) + if err != nil { + // close everything else we've already opened + rv.Close() // ignore errors closing? and return open error? + return nil, err + } + kvs, err := db.OpenKVStore(kvstore, kvconfig) + if err != nil { + // close the db file we just opened + db.Close() + // close everything else we've already opened + rv.Close() // ignore errors closing? and return open error? + return nil, err + } + rv.stores <- kvs + } + return &rv, nil +} + +func (p *KVPool) Get() (*KVStore, error) { + rv, ok := <-p.stores + if !ok { + return nil, PoolClosed + } + return rv, nil +} + +func (p *KVPool) Return(kvs *KVStore) error { + p.closedMutex.RLock() + defer p.closedMutex.RUnlock() + if !p.closed { + p.stores <- kvs + return nil + } + return PoolClosed +} + +func (p *KVPool) Close() (rverr error) { + p.closedMutex.Lock() + if !p.closed { + close(p.stores) + } + p.closed = true + p.closedMutex.Unlock() + + for kvs := range p.stores { + err := kvs.Close() + if err != nil { + if rverr == nil { + rverr = err + } + // keep going try to close file + } + db := kvs.File() + err = db.Close() + if err != nil { + if rverr == nil { + rverr = err + } + } + } + return +} diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..5bf59eb --- /dev/null +++ b/pool_test.go @@ -0,0 +1,49 @@ +package forestdb + +import ( + "os" + "testing" +) + +func TestPool(t *testing.T) { + defer os.RemoveAll("test") + + // create a pool of 10 forestdb clients for file: test kvstore: default + fdbConfig := DefaultConfig() + kvConfig := DefaultKVStoreConfig() + kvpool, err := NewKVPool("test", fdbConfig, "default", kvConfig, 10) + if err != nil { + t.Fatal(err) + } + + // get from the pool + kvs, err := kvpool.Get() + if err != nil { + t.Fatal(err) + } + + // return to pool + err = kvpool.Return(kvs) + if err != nil { + t.Fatal(err) + } + + // close the pool + err = kvpool.Close() + if err != nil { + t.Fatal(err) + } + + // try to get after closing + _, err = kvpool.Get() + if err != PoolClosed { + t.Errorf("expected %v, got %v when calling Get on closed pool", PoolClosed, err) + } + + // try to return after closing + err = kvpool.Return(kvs) + if err != PoolClosed { + t.Errorf("expected %v, got %v when calling Return on closed pool", PoolClosed, err) + } + +}