Skip to content

Commit

Permalink
introduce new KVPool to aid in concurrent usage
Browse files Browse the repository at this point in the history
The KVPool addresses the use case where an application
wants to support N concurrent actors on a single
ForestDB KVStore at one time.  ForestDB requries that
these actors operate on separate fdb handles and this
KVPool attempts to make this pattern simpler.

Users invoke NewKVPool, with parameters used
to create the *File and child *KVStore objects as well
as the number N of objects to pool.  Only references
to the *KVStore are kept in the pool, access to the
parent *File is done through the KVStore.File()
method.

Once you have a KVPool, clients get a KVStore by using
the Get() method.  The MUST return it using the
Return() method when done.  The client MUST call
Close() in order to free all resources.  The call
to Close() may block forever in the event that
KVStores acquired through Get() were not Return()'d.
Change-Id: Icacc4f3fd0daf7957bb82c5a6f4b1facf3461acc
Reviewed-on: http://review.couchbase.org/55706
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: Marty Schoch <marty.schoch@gmail.com>
  • Loading branch information
mschoch committed Oct 2, 2015
1 parent 4e7b1ad commit a758b1d
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 0 deletions.
5 changes: 5 additions & 0 deletions forestdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type KVStore struct {
db *C.fdb_kvs_handle
}

// File returns the File containing this KVStore
func (k *KVStore) File() *File {
return k.f
}

// Close the KVStore and release related resources.
func (k *KVStore) Close() error {
Log.Tracef("fdb_kvs_close call k:%p db:%v", k, k.db)
Expand Down
63 changes: 63 additions & 0 deletions forestdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package forestdb

import (
"encoding/binary"
"fmt"
"os"
"sync"
"testing"
)

Expand Down Expand Up @@ -267,3 +269,64 @@ func TestForestDBCompactUpto(t *testing.T) {
t.Errorf("expected commit marker seqnum 10, got %v", cm[0].GetSeqNum())
}
}

func TestForestDBConcurrent(t *testing.T) {
numWriters := 2
numReaders := 4
numOps := 100000
testValue := []byte{0, 0, 0, 0, 0, 0, 0, 0}
defer os.RemoveAll("test")

fdbConfig := DefaultConfig()
kvConfig := DefaultKVStoreConfig()

// create a pool, each worker its own file/kvstore
kvpool, err := NewKVPool("test", fdbConfig, "default", kvConfig, numReaders+numWriters)
if err != nil {
t.Fatal(err)
}
defer kvpool.Close()

var wg sync.WaitGroup
// start writers
for i := 0; i < numWriters; i++ {
kvs, err := kvpool.Get()
if err != nil {
t.Fatal(err)
}
db := kvs.File()
wg.Add(1)
go func(base int, db *File, kvs *KVStore) {
defer wg.Done()
defer kvpool.Return(kvs)
for n := 0; n < numOps; n++ {
key := make([]byte, 4)
binary.BigEndian.PutUint32(key, uint32(base*numOps+n))
if err := kvs.SetKV(key, testValue); err != nil {
t.Fatalf("writer err: %v", err)
}
}
}(i, db, kvs)
}
// start readers
for i := 0; i < numReaders; i++ {
kvs, err := kvpool.Get()
if err != nil {
t.Fatal(err)
}
db := kvs.File()
wg.Add(1)
go func(base int, db *File, kvs *KVStore) {
defer wg.Done()
defer kvpool.Return(kvs)
for n := 0; n < numOps; n++ {
key := make([]byte, 4)
binary.BigEndian.PutUint32(key, uint32(base*numOps+n))
if _, err := kvs.GetKV(key); err != nil && err != RESULT_KEY_NOT_FOUND {
t.Fatalf("reader err: %v", err)
}
}
}(i, db, kvs)
}
wg.Wait()
}
85 changes: 85 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package forestdb

import (
"fmt"
"sync"
)

// KVPool is a structure representing a pool of KVStores
// inside a file. Each has been opened with it's own
// File handle, so they can be used concurrently safely.
type KVPool struct {
closedMutex sync.RWMutex
closed bool
stores chan *KVStore
}

var PoolClosed = fmt.Errorf("pool already closed")

func NewKVPool(filename string, config *Config, kvstore string, kvconfig *KVStoreConfig, size int) (*KVPool, error) {
rv := KVPool{}
rv.stores = make(chan *KVStore, size)
for i := 0; i < size; i++ {
db, err := Open(filename, config)
if err != nil {
// close everything else we've already opened
rv.Close() // ignore errors closing? and return open error?
return nil, err
}
kvs, err := db.OpenKVStore(kvstore, kvconfig)
if err != nil {
// close the db file we just opened
db.Close()
// close everything else we've already opened
rv.Close() // ignore errors closing? and return open error?
return nil, err
}
rv.stores <- kvs
}
return &rv, nil
}

func (p *KVPool) Get() (*KVStore, error) {
rv, ok := <-p.stores
if !ok {
return nil, PoolClosed
}
return rv, nil
}

func (p *KVPool) Return(kvs *KVStore) error {
p.closedMutex.RLock()
defer p.closedMutex.RUnlock()
if !p.closed {
p.stores <- kvs
return nil
}
return PoolClosed
}

func (p *KVPool) Close() (rverr error) {
p.closedMutex.Lock()
if !p.closed {
close(p.stores)
}
p.closed = true
p.closedMutex.Unlock()

for kvs := range p.stores {
err := kvs.Close()
if err != nil {
if rverr == nil {
rverr = err
}
// keep going try to close file
}
db := kvs.File()
err = db.Close()
if err != nil {
if rverr == nil {
rverr = err
}
}
}
return
}
49 changes: 49 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package forestdb

import (
"os"
"testing"
)

func TestPool(t *testing.T) {
defer os.RemoveAll("test")

// create a pool of 10 forestdb clients for file: test kvstore: default
fdbConfig := DefaultConfig()
kvConfig := DefaultKVStoreConfig()
kvpool, err := NewKVPool("test", fdbConfig, "default", kvConfig, 10)
if err != nil {
t.Fatal(err)
}

// get from the pool
kvs, err := kvpool.Get()
if err != nil {
t.Fatal(err)
}

// return to pool
err = kvpool.Return(kvs)
if err != nil {
t.Fatal(err)
}

// close the pool
err = kvpool.Close()
if err != nil {
t.Fatal(err)
}

// try to get after closing
_, err = kvpool.Get()
if err != PoolClosed {
t.Errorf("expected %v, got %v when calling Get on closed pool", PoolClosed, err)
}

// try to return after closing
err = kvpool.Return(kvs)
if err != PoolClosed {
t.Errorf("expected %v, got %v when calling Return on closed pool", PoolClosed, err)
}

}

0 comments on commit a758b1d

Please sign in to comment.