Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abstract bbolt from osquery extension code #1652

Merged
merged 8 commits into from
Mar 20, 2024
2 changes: 1 addition & 1 deletion cmd/launcher/internal/record_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (mw *metadataWriter) recordMetadata(metadata *metadata) error {
return nil
}

func (mw *metadataWriter) getServerDataValue(store types.GetterSetterDeleterIteratorUpdater, key string) string {
func (mw *metadataWriter) getServerDataValue(store types.KVStore, key string) string {
val, err := store.Get([]byte(key))
if err != nil {
mw.slogger.Log(context.TODO(), slog.LevelDebug,
Expand Down
68 changes: 67 additions & 1 deletion ee/agent/storage/bbolt/keyvalue_store_bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agentbbolt

import (
"context"
"encoding/binary"
"fmt"
"log/slog"

Expand Down Expand Up @@ -137,12 +138,14 @@ func (s *bboltKeyValueStore) DeleteAll() error {
})
}

// ForEach provides a read-only iterator for all key-value pairs stored within s.bucketName
// this allows bboltKeyValueStore to adhere to the types.Iterator interface
func (s *bboltKeyValueStore) ForEach(fn func(k, v []byte) error) error {
if s == nil || s.db == nil {
return NoDbError{}
}

return s.db.Update(func(tx *bbolt.Tx) error {
return s.db.View(func(tx *bbolt.Tx) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing oversight, this should be read-only and in a View transaction

b := tx.Bucket([]byte(s.bucketName))
if b == nil {
return NewNoBucketError(s.bucketName)
Expand Down Expand Up @@ -225,3 +228,66 @@ func (s *bboltKeyValueStore) Update(kvPairs map[string]string) ([]string, error)

return deletedKeys, nil
}

func (s *bboltKeyValueStore) Count() int {
if s == nil || s.db == nil {
s.slogger.Log(context.TODO(), slog.LevelError, "unable to count uninitialized bbolt storage db")
return 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safe to return 0 instead of 0, err in the error cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on the fence about this, it seems safe for current uses but wondering if we'll regret this for the interface long term. I will update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed!

}

var len int
if err := s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(s.bucketName))
if b == nil {
return NewNoBucketError(s.bucketName)
}

len = b.Stats().KeyN
return nil
}); err != nil {
s.slogger.Log(context.TODO(), slog.LevelError,
"err counting from bucket",
"err", err,
)
return 0
}

return len
}

// AppendValues utlizes bbolts NextSequence functionality to add ordered values
// after generating the next autoincrementing key for each
func (s *bboltKeyValueStore) AppendValues(values ...[]byte) error {
if s == nil || s.db == nil {
return fmt.Errorf("unable to append values into uninitialized bbolt db store")
}

return s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(s.bucketName))
if b == nil {
return NewNoBucketError(s.bucketName)
}

for _, value := range values {
key, err := b.NextSequence()
if err != nil {
return fmt.Errorf("generating key: %w", err)
}

b.Put(byteKeyFromUint64(key), value)
if err != nil {
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("adding ordered value: %w", err)
}
}

return nil
})
}

func byteKeyFromUint64(k uint64) []byte {
// Adapted from Bolt docs
// 8 bytes in a uint64
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, k)
return b
}
101 changes: 86 additions & 15 deletions ee/agent/storage/ci/keyvalue_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,7 @@ func Test_Delete(t *testing.T) {
require.NoError(t, err)
}

// There should be no records, count and verify
var recordCount int
rowFn := func(k, v []byte) error {
recordCount = recordCount + 1
return nil
}
s.ForEach(rowFn)
assert.Equal(t, tt.expectedRecordCount, recordCount)
assert.Equal(t, tt.expectedRecordCount, s.Count())
}
})
}
Expand Down Expand Up @@ -189,13 +182,7 @@ func Test_DeleteAll(t *testing.T) {
require.NoError(t, s.DeleteAll())

// There should be no records, count and verify
var recordCount int
rowFn := func(k, v []byte) error {
recordCount = recordCount + 1
return nil
}
s.ForEach(rowFn)
assert.Equal(t, 0, recordCount)
assert.Equal(t, 0, s.Count())
}
})
}
Expand Down Expand Up @@ -377,6 +364,90 @@ func Test_ForEach(t *testing.T) {
}
}

func Test_Count(t *testing.T) {
t.Parallel()

tests := []struct {
name string
sets map[string]string
expectedCount int
}{
{
name: "empty",
sets: map[string]string{},
expectedCount: 0,
},
{
name: "one value",
sets: map[string]string{"key1": "value1"},
expectedCount: 1,
},
{
name: "multiple values",
sets: map[string]string{"key1": "value1", "key2": "value2", "key3": "value3", "key4": "value4"},
expectedCount: 4,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

for _, s := range getStores(t) {
_, err := s.Update(tt.sets)
require.NoError(t, err)
assert.Equal(t, tt.expectedCount, s.Count())
}
})
}
}

func Test_AppendValues(t *testing.T) {
t.Parallel()

tests := []struct {
name string
sets [][]byte
expectedCount int
}{
{
name: "empty",
sets: [][]byte{},
expectedCount: 0,
},
{
name: "one value",
sets: [][]byte{[]byte("one")},
expectedCount: 1,
},
{
name: "multiple values",
sets: [][]byte{[]byte("one"), []byte("two"), []byte("three"), []byte("four"), []byte("five")},
expectedCount: 5,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

for _, s := range getStores(t) {
err := s.AppendValues(tt.sets...)
require.NoError(t, err)
// check the count to ensure the tests below will endure the expected number of iterations
require.Equal(t, tt.expectedCount, s.Count())
idx := 0
// now we expect to be able to iterate over these in the same order that we appended them
s.ForEach(func(k, v []byte) error {
require.Equal(t, tt.sets[idx], v)
idx++
return nil
})
}
})
}
}

func getKeyValueRows(store types.KVStore, bucketName string) ([]map[string]string, error) {
results := make([]map[string]string, 0)

Expand Down
73 changes: 59 additions & 14 deletions ee/agent/storage/inmemory/keyvalue_store_in_memory.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package inmemory

import (
"encoding/binary"
"errors"
"fmt"
"sync"
)

type inMemoryKeyValueStore struct {
mu sync.RWMutex
items map[string][]byte
mu sync.RWMutex
items map[string][]byte
order []string
sequence uint64
}

func NewStore() *inMemoryKeyValueStore {
s := &inMemoryKeyValueStore{
items: make(map[string][]byte),
order: make([]string, 0),
}

return s
Expand Down Expand Up @@ -42,7 +47,14 @@ func (s *inMemoryKeyValueStore) Set(key, value []byte) error {

s.mu.Lock()
defer s.mu.Unlock()
s.items[string(key)] = value

if _, exists := s.items[string(key)]; !exists {
s.order = append(s.order, string(key))
}

s.items[string(key)] = make([]byte, len(value))
copy(s.items[string(key)], value)

return nil
}

Expand All @@ -55,7 +67,14 @@ func (s *inMemoryKeyValueStore) Delete(keys ...[]byte) error {
defer s.mu.Unlock()
for _, key := range keys {
delete(s.items, string(key))
for i, k := range s.order {
if k == string(key) {
s.order = append(s.order[:i], s.order[i+1:]...)
break
}
}
}

return nil
}

Expand All @@ -66,7 +85,9 @@ func (s *inMemoryKeyValueStore) DeleteAll() error {

s.mu.Lock()
defer s.mu.Unlock()
clear(s.items)
s.items = make(map[string][]byte)
s.order = make([]string, 0)

return nil
}

Expand All @@ -77,44 +98,68 @@ func (s *inMemoryKeyValueStore) ForEach(fn func(k, v []byte) error) error {

s.mu.Lock()
defer s.mu.Unlock()
for k, v := range s.items {
if err := fn([]byte(k), v); err != nil {
for _, key := range s.order {
if err := fn([]byte(key), s.items[key]); err != nil {
return err
}
}
return nil
}

// Update adheres to the Updater interface for bulk replacing data in a key/value store.
// Note that this method internally defers all mutating operations to the existing Set/Delete
// functions, so the mutex is not locked here
func (s *inMemoryKeyValueStore) Update(kvPairs map[string]string) ([]string, error) {
if s == nil {
return nil, errors.New("store is nil")
}

s.mu.Lock()
defer s.mu.Unlock()

s.items = make(map[string][]byte)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how this was ever returning deleted keys


for key, value := range kvPairs {
if key == "" {
return nil, errors.New("key is blank")
}

s.items[key] = []byte(value)
s.Set([]byte(key), []byte(value))
}

var deletedKeys []string
deletedKeys := make([]string, 0)

for key := range s.items {
if _, ok := kvPairs[key]; ok {
continue
}

delete(s.items, key)
s.Delete([]byte(key))

// Remember which keys we're deleting
deletedKeys = append(deletedKeys, key)
}

return deletedKeys, nil
}

func (s *inMemoryKeyValueStore) Count() int {
s.mu.Lock()
defer s.mu.Unlock()

return len(s.items)
}

func (s *inMemoryKeyValueStore) AppendValues(values ...[]byte) error {
if s == nil {
return fmt.Errorf("unable to append values into uninitialized inmemory db store")
}

for _, value := range values {
s.Set(s.nextSequenceKey(), value)
}

return nil
}

func (s *inMemoryKeyValueStore) nextSequenceKey() []byte {
s.sequence++
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, s.sequence)
return b
}
Loading
Loading