Skip to content

Commit

Permalink
abstract bbolt from osquery extension code (#1652)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackattack01 authored Mar 20, 2024
1 parent ff7e932 commit fac5f38
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 254 deletions.
2 changes: 1 addition & 1 deletion cmd/launcher/internal/record_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (mw *metadataWriter) recordMetadata(metadata *metadata) error {
return nil
}

func (mw *metadataWriter) getServerDataValue(store types.GetterSetterDeleterIteratorUpdater, key string) string {
func (mw *metadataWriter) getServerDataValue(store types.KVStore, key string) string {
val, err := store.Get([]byte(key))
if err != nil {
mw.slogger.Log(context.TODO(), slog.LevelDebug,
Expand Down
67 changes: 66 additions & 1 deletion ee/agent/storage/bbolt/keyvalue_store_bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agentbbolt

import (
"context"
"encoding/binary"
"fmt"
"log/slog"

Expand Down Expand Up @@ -137,12 +138,14 @@ func (s *bboltKeyValueStore) DeleteAll() error {
})
}

// ForEach provides a read-only iterator for all key-value pairs stored within s.bucketName
// this allows bboltKeyValueStore to adhere to the types.Iterator interface
func (s *bboltKeyValueStore) ForEach(fn func(k, v []byte) error) error {
if s == nil || s.db == nil {
return NoDbError{}
}

return s.db.Update(func(tx *bbolt.Tx) error {
return s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(s.bucketName))
if b == nil {
return NewNoBucketError(s.bucketName)
Expand Down Expand Up @@ -225,3 +228,65 @@ func (s *bboltKeyValueStore) Update(kvPairs map[string]string) ([]string, error)

return deletedKeys, nil
}

func (s *bboltKeyValueStore) Count() (int, error) {
if s == nil || s.db == nil {
s.slogger.Log(context.TODO(), slog.LevelError, "unable to count uninitialized bbolt storage db")
return 0, NoDbError{}
}

var len int
if err := s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(s.bucketName))
if b == nil {
return NewNoBucketError(s.bucketName)
}

len = b.Stats().KeyN
return nil
}); err != nil {
s.slogger.Log(context.TODO(), slog.LevelError,
"err counting from bucket",
"err", err,
)
return 0, err
}

return len, nil
}

// AppendValues utlizes bbolts NextSequence functionality to add ordered values
// after generating the next autoincrementing key for each
func (s *bboltKeyValueStore) AppendValues(values ...[]byte) error {
if s == nil || s.db == nil {
return fmt.Errorf("unable to append values into uninitialized bbolt db store")
}

return s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(s.bucketName))
if b == nil {
return NewNoBucketError(s.bucketName)
}

for _, value := range values {
key, err := b.NextSequence()
if err != nil {
return fmt.Errorf("generating key: %w", err)
}

if err = b.Put(byteKeyFromUint64(key), value); err != nil {
return fmt.Errorf("adding ordered value: %w", err)
}
}

return nil
})
}

func byteKeyFromUint64(k uint64) []byte {
// Adapted from Bolt docs
// 8 bytes in a uint64
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, k)
return b
}
109 changes: 94 additions & 15 deletions ee/agent/storage/ci/keyvalue_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,9 @@ func Test_Delete(t *testing.T) {
require.NoError(t, err)
}

// There should be no records, count and verify
var recordCount int
rowFn := func(k, v []byte) error {
recordCount = recordCount + 1
return nil
}
s.ForEach(rowFn)
assert.Equal(t, tt.expectedRecordCount, recordCount)
totalCount, err := s.Count()
require.NoError(t, err)
assert.Equal(t, tt.expectedRecordCount, totalCount)
}
})
}
Expand Down Expand Up @@ -189,13 +184,9 @@ func Test_DeleteAll(t *testing.T) {
require.NoError(t, s.DeleteAll())

// There should be no records, count and verify
var recordCount int
rowFn := func(k, v []byte) error {
recordCount = recordCount + 1
return nil
}
s.ForEach(rowFn)
assert.Equal(t, 0, recordCount)
totalCount, err := s.Count()
require.NoError(t, err)
assert.Equal(t, 0, totalCount)
}
})
}
Expand Down Expand Up @@ -377,6 +368,94 @@ func Test_ForEach(t *testing.T) {
}
}

func Test_Count(t *testing.T) {
t.Parallel()

tests := []struct {
name string
sets map[string]string
expectedCount int
}{
{
name: "empty",
sets: map[string]string{},
expectedCount: 0,
},
{
name: "one value",
sets: map[string]string{"key1": "value1"},
expectedCount: 1,
},
{
name: "multiple values",
sets: map[string]string{"key1": "value1", "key2": "value2", "key3": "value3", "key4": "value4"},
expectedCount: 4,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

for _, s := range getStores(t) {
_, err := s.Update(tt.sets)
require.NoError(t, err)
totalCount, err := s.Count()
require.NoError(t, err)
assert.Equal(t, tt.expectedCount, totalCount)
}
})
}
}

func Test_AppendValues(t *testing.T) {
t.Parallel()

tests := []struct {
name string
sets [][]byte
expectedCount int
}{
{
name: "empty",
sets: [][]byte{},
expectedCount: 0,
},
{
name: "one value",
sets: [][]byte{[]byte("one")},
expectedCount: 1,
},
{
name: "multiple values",
sets: [][]byte{[]byte("one"), []byte("two"), []byte("three"), []byte("four"), []byte("five")},
expectedCount: 5,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

for _, s := range getStores(t) {
err := s.AppendValues(tt.sets...)
require.NoError(t, err)
// check the count to ensure the tests below will endure the expected number of iterations
totalCount, err := s.Count()
require.NoError(t, err)
require.Equal(t, tt.expectedCount, totalCount)
idx := 0
// now we expect to be able to iterate over these in the same order that we appended them
s.ForEach(func(k, v []byte) error {
require.Equal(t, tt.sets[idx], v)
idx++
return nil
})
}
})
}
}

func getKeyValueRows(store types.KVStore, bucketName string) ([]map[string]string, error) {
results := make([]map[string]string, 0)

Expand Down
76 changes: 62 additions & 14 deletions ee/agent/storage/inmemory/keyvalue_store_in_memory.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package inmemory

import (
"encoding/binary"
"errors"
"fmt"
"sync"
)

type inMemoryKeyValueStore struct {
mu sync.RWMutex
items map[string][]byte
mu sync.RWMutex
items map[string][]byte
order []string
sequence uint64
}

func NewStore() *inMemoryKeyValueStore {
s := &inMemoryKeyValueStore{
items: make(map[string][]byte),
order: make([]string, 0),
}

return s
Expand Down Expand Up @@ -42,7 +47,14 @@ func (s *inMemoryKeyValueStore) Set(key, value []byte) error {

s.mu.Lock()
defer s.mu.Unlock()
s.items[string(key)] = value

if _, exists := s.items[string(key)]; !exists {
s.order = append(s.order, string(key))
}

s.items[string(key)] = make([]byte, len(value))
copy(s.items[string(key)], value)

return nil
}

Expand All @@ -55,7 +67,14 @@ func (s *inMemoryKeyValueStore) Delete(keys ...[]byte) error {
defer s.mu.Unlock()
for _, key := range keys {
delete(s.items, string(key))
for i, k := range s.order {
if k == string(key) {
s.order = append(s.order[:i], s.order[i+1:]...)
break
}
}
}

return nil
}

Expand All @@ -66,7 +85,9 @@ func (s *inMemoryKeyValueStore) DeleteAll() error {

s.mu.Lock()
defer s.mu.Unlock()
clear(s.items)
s.items = make(map[string][]byte)
s.order = make([]string, 0)

return nil
}

Expand All @@ -77,44 +98,71 @@ func (s *inMemoryKeyValueStore) ForEach(fn func(k, v []byte) error) error {

s.mu.Lock()
defer s.mu.Unlock()
for k, v := range s.items {
if err := fn([]byte(k), v); err != nil {
for _, key := range s.order {
if err := fn([]byte(key), s.items[key]); err != nil {
return err
}
}
return nil
}

// Update adheres to the Updater interface for bulk replacing data in a key/value store.
// Note that this method internally defers all mutating operations to the existing Set/Delete
// functions, so the mutex is not locked here
func (s *inMemoryKeyValueStore) Update(kvPairs map[string]string) ([]string, error) {
if s == nil {
return nil, errors.New("store is nil")
}

s.mu.Lock()
defer s.mu.Unlock()

s.items = make(map[string][]byte)

for key, value := range kvPairs {
if key == "" {
return nil, errors.New("key is blank")
}

s.items[key] = []byte(value)
s.Set([]byte(key), []byte(value))
}

var deletedKeys []string
deletedKeys := make([]string, 0)

for key := range s.items {
if _, ok := kvPairs[key]; ok {
continue
}

delete(s.items, key)
s.Delete([]byte(key))

// Remember which keys we're deleting
deletedKeys = append(deletedKeys, key)
}

return deletedKeys, nil
}

func (s *inMemoryKeyValueStore) Count() (int, error) {
s.mu.Lock()
defer s.mu.Unlock()

return len(s.items), nil
}

func (s *inMemoryKeyValueStore) AppendValues(values ...[]byte) error {
if s == nil {
return fmt.Errorf("unable to append values into uninitialized inmemory db store")
}

for _, value := range values {
s.Set(s.nextSequenceKey(), value)
}

return nil
}

func (s *inMemoryKeyValueStore) nextSequenceKey() []byte {
s.mu.Lock()
defer s.mu.Unlock()

s.sequence++
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, s.sequence)
return b
}
Loading

0 comments on commit fac5f38

Please sign in to comment.