Skip to content

Commit

Permalink
add KV transaction interfaces and new transaction wrapping key-value …
Browse files Browse the repository at this point in the history
…store (#4)
  • Loading branch information
jessepeterson committed Sep 10, 2024
1 parent d3a9d66 commit b79e666
Show file tree
Hide file tree
Showing 14 changed files with 926 additions and 3 deletions.
3 changes: 0 additions & 3 deletions storage/kv/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,3 @@ type CRUDBucket interface {
ROBucket
RWBucket
}

// Bucket is an alias for any commonly used key-value store.
type Bucket = CRUDBucket
3 changes: 3 additions & 0 deletions storage/kv/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ type KeysPrefixTraversingBucket interface {
KeysTraverser
KeysPrefixTraverser
}

// Bucket is an alias for any commonly used key-value store.
type Bucket = KeysPrefixTraversingBucket
77 changes: 77 additions & 0 deletions storage/kv/kvtxn/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package kvtxn

import (
"context"

"github.com/micromdm/nanolib/storage/kv"
)

// Get retrieves value at key.
// A previously staged key may be returned.
func (b *KVTxn) Get(ctx context.Context, key string) ([]byte, error) {
if !b.hasOp(key) {
b.keyLock.RLock(key)
defer b.keyLock.RUnlock(key)
}
if !b.autoCommit {
b.stageLock.RLock()
defer b.stageLock.RUnlock()
if value, del, found := b.stageGet(key); found {
if del {
// found a stage operation that deleted this key
return nil, kv.ErrKeyNotFound
}
return value, nil
}
}
// fallback to underlying store
return b.store.Get(ctx, key)
}

// Set sets key to value in the staged operations.
// This change may be auto-commited.
func (b *KVTxn) Set(ctx context.Context, key string, value []byte) error {
if !b.hasOp(key) {
b.keyLock.Lock(key)
}
b.stageLock.Lock()
defer b.stageLock.Unlock()
b.stageSet(key, value)
if b.autoCommit {
return b.stageCommit(ctx)
}
return nil
}

// Has checks that key can be found.
// A previously staged key may be returned.
func (b *KVTxn) Has(ctx context.Context, key string) (bool, error) {
if !b.hasOp(key) {
b.keyLock.RLock(key)
defer b.keyLock.RUnlock(key)
}
if !b.autoCommit {
b.stageLock.RLock()
defer b.stageLock.RUnlock()
if has, found := b.stageHas(key); found {
return has, nil
}
}
// fallback to underlying store
return b.store.Has(ctx, key)
}

// Delete deletes key in the staged operations.
// This change may be auto-commited.
func (b *KVTxn) Delete(ctx context.Context, key string) error {
if !b.hasOp(key) {
b.keyLock.Lock(key)
}
b.stageLock.Lock()
defer b.stageLock.Unlock()
b.stageDelete(key)
if b.autoCommit {
return b.stageCommit(ctx)
}
return nil
}
69 changes: 69 additions & 0 deletions storage/kv/kvtxn/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kvtxn

import (
"context"
)

// Keys returns all keys in the underlying key-value store merging with the operations stage.
// The returned keys have no ordering guaratees.
// The keys channel should be closed if cancel was provided and closed.
// Beware of deadlocks with underlying implementations.
// Note that key-based stage locks are not consulted.
func (b *KVTxn) Keys(ctx context.Context, cancel <-chan struct{}) <-chan string {
return b.keysWithStagedKeys(b.store.Keys(ctx, cancel), cancel)
}

// Keys returns all keys starting with prefix in the underlying key-value store merging with the operations stage.
// The returned keys have no ordering guaratees.
// The keys channel should be closed if cancel was provided and closed.
// Beware of deadlocks with underlying implementations.
// Note that key-based stage locks are not consulted.
func (b *KVTxn) KeysPrefix(ctx context.Context, prefix string, cancel <-chan struct{}) <-chan string {
return b.keysWithStagedKeys(b.store.KeysPrefix(ctx, prefix, cancel), cancel)
}

// stageKeys returns a slice of all staged keys.
// Keys that have a delete operation are not included if noDel is true.
func (b *KVTxn) stageKeys(skipDeleted bool) []string {
var r []string
for k, v := range b.stageKeyOps {
if v.del && skipDeleted {
continue
}
r = append(r, k)
}
return r
}

// keysWithStagedKeys returns a merged set from inKeys and keys from staged operations.
func (b *KVTxn) keysWithStagedKeys(inKeys <-chan string, cancel <-chan struct{}) <-chan string {
r := make(chan string)
go func() {
defer close(r)
for k := range inKeys {
b.stageLock.RLock()
_, found := b.stageHas(k)
b.stageLock.RUnlock()
if found {
// skip this key, it's in the stage
continue
}
select {
case <-cancel:
return
case r <- k:
}
}
b.stageLock.RLock()
// retreive all of our staged keys (minus the staged deletions)
for _, k := range b.stageKeys(true) {
select {
case <-cancel:
return
case r <- k:
}
}
b.stageLock.RUnlock()
}()
return r
}
127 changes: 127 additions & 0 deletions storage/kv/kvtxn/kvtxn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Package kvtxn provides an in-memory transactional wrapper for KV stores.
// Note that underlying KV stores are assumed to not support
// multi-operation atomicity. Thus this wrapper cannot guarantee
// transaction atomicity, either.
package kvtxn

import (
"context"
"sync"

"github.com/micromdm/nanolib/storage/kv"
)

// KeyLockManager works like sync.RWMutex but supports per-key locking.
type KeyLockManager interface {
RLock(key string)
RUnlock(key string)
Lock(key string)
Unlock(key string)
}

// keyOp is a staged operation for a key.
type keyOp struct {
value []byte
del bool // if true this operation signifies a deletion (of a key)
}

// KVTxn is a key-value store wrapper that supports in-memory transactions.
// Note the underlying KV store can still be inconsistent—this wrapper
// does NOT gauarantee any commit atomicity.
//
// KVTxn maintains an in-memory "stage" for write operations
// per-transaction. These staged operations can be rolled-back or
// committed.
// The store uses key-based mutexes for the duration of transactions
// to try to maintain consistency.
type KVTxn struct {
store kv.KeysPrefixTraversingBucket
stageLock sync.RWMutex
stageKeyOps map[string]keyOp
keyLock KeyLockManager
autoCommit bool
}

// New creates a new in-memory transacting key-value store that wraps store.
// Note that a single in-memory lock manager is created so transaction
// locking will only be scoped to this newly created store.
func New(store kv.KeysPrefixTraversingBucket) *KVTxn {
// create a new store with auto-commit on.
return new(store, NewInmemLockManager(), true)
}

// new is a helper for creating KVTxns that wraps store.
func new(store kv.KeysPrefixTraversingBucket, keyLock KeyLockManager, autoCommit bool) *KVTxn {
if store == nil {
panic("nil store")
}
if keyLock == nil {
panic("nil key lock manager")
}
return &KVTxn{
store: store,
stageKeyOps: make(map[string]keyOp),
keyLock: keyLock,
autoCommit: autoCommit,
}
}

// stageGet retreives a key from the staged key operations.
func (b *KVTxn) stageGet(key string) (value []byte, del bool, found bool) {
keyOp, ok := b.stageKeyOps[key]
return keyOp.value, keyOp.del, ok
}

// stageSet sets a value for key in the staged key operations.
func (b *KVTxn) stageSet(key string, value []byte) {
b.stageKeyOps[key] = keyOp{value: value}
}

// stageHas checks that a key can be found in the staged key operations.
func (b *KVTxn) stageHas(key string) (has, found bool) {
keyOp, ok := b.stageKeyOps[key]
return !keyOp.del, ok
}

// stageDelete stages a key deletion in the staged key operations.
func (b *KVTxn) stageDelete(key string) {
b.stageKeyOps[key] = keyOp{del: true}
}

// stageReset resets the staged operations.
func (b *KVTxn) stageReset() {
for k := range b.stageKeyOps {
// make sure we unlock any keys in the stage
b.keyLock.Unlock(k)
}
b.stageKeyOps = make(map[string]keyOp)
}

// hasOp checks if there is an operation staged for key.
// A read lock is obtained for the stage lookup.
func (b *KVTxn) hasOp(key string) (ok bool) {
b.stageLock.RLock()
_, ok = b.stageKeyOps[key]
b.stageLock.RUnlock()
return
}

// stageCommit commits (sends) the staged operations to the wrapped KV store.
func (b *KVTxn) stageCommit(ctx context.Context) error {
var err error
for key, op := range b.stageKeyOps {
if err == nil {
if op.del {
err = b.store.Delete(ctx, key)
} else {
err = b.store.Set(ctx, key, op.value)
}
}
b.keyLock.Unlock(key)
// if we had no error, remove the operation
if err == nil {
delete(b.stageKeyOps, key)
}
}
return err
}
19 changes: 19 additions & 0 deletions storage/kv/kvtxn/kvtxn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package kvtxn

import (
"context"
"testing"

"github.com/micromdm/nanolib/storage/kv/kvmap"
"github.com/micromdm/nanolib/storage/kv/test"
)

func TestKVTxn(t *testing.T) {
b := New(kvmap.New())
ctx := context.Background()
test.TestBucketSimple(t, ctx, b)
test.TestKeysTraversing(t, ctx, b)
test.TestTxnSimple(t, ctx, b)
b = New(kvmap.New()) // clear test data
t.Run("TestKVTxnKeys", func(t *testing.T) { test.TestKVTxnKeys(t, ctx, b) })
}
Loading

0 comments on commit b79e666

Please sign in to comment.