Skip to content

Commit

Permalink
Add support for watching nil prefix in subscribe API (#1246)
Browse files Browse the repository at this point in the history
This PR adds support for watching empty prefixes (all keys) in subscribe API.

To subscribe to all changes in badger, user can run
```go
db.Subscribe(ctx, handler, nil)
```
or 
```go
db.Subscribe(ctx, handler, []byte{})
```
  • Loading branch information
Ibrahim Jarif authored Mar 13, 2020
1 parent b13b927 commit c6c1e5e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 13 deletions.
6 changes: 2 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,9 +1580,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, prefixes
if cb == nil {
return ErrNilCallback
}
if len(prefixes) == 0 {
return ErrNoPrefixes
}

c := y.NewCloser(1)
recvCh, id := db.pub.newSubscriber(c, prefixes...)
slurp := func(batch *pb.KVList) error {
Expand Down Expand Up @@ -1616,7 +1614,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, prefixes
err := slurp(batch)
if err != nil {
c.Done()
// Delete the subsriber if there is an error by the callback.
// Delete the subscriber if there is an error by the callback.
db.pub.deleteSubscriber(id)
return err
}
Expand Down
3 changes: 0 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ var (
// ErrNilCallback is returned when subscriber's callback is nil.
ErrNilCallback = errors.New("Callback cannot be nil")

// ErrNoPrefixes is returned when subscriber doesn't provide any prefix.
ErrNoPrefixes = errors.New("At least one key prefix is required")

// ErrEncryptionKeyMismatch is returned when the storage key is not
// matched with the key previously given.
ErrEncryptionKeyMismatch = errors.New("Encryption key mismatch")
Expand Down
7 changes: 7 additions & 0 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func (t *Trie) Add(prefix []byte, id uint64) {
func (t *Trie) Get(key []byte) map[uint64]struct{} {
out := make(map[uint64]struct{})
node := t.root
// If root has ids that means we have subscribers for "nil/[]byte{}"
// prefix. Add them to the list.
if len(node.ids) > 0 {
for _, i := range node.ids {
out[i] = struct{}{}
}
}
for _, val := range key {
child, ok := node.children[val]
if !ok {
Expand Down
27 changes: 21 additions & 6 deletions trie/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,38 @@ func TestGet(t *testing.T) {
trie.Add([]byte("hel"), 20)
trie.Add([]byte("he"), 20)
trie.Add([]byte("badger"), 30)

trie.Add(nil, 10)
require.Equal(t, map[uint64]struct{}{10: {}}, trie.Get([]byte("A")))

ids := trie.Get([]byte("hel"))
require.Equal(t, 1, len(ids))
require.Equal(t, 2, len(ids))
require.Equal(t, map[uint64]struct{}{10: {}, 20: {}}, ids)

require.Equal(t, map[uint64]struct{}{20: {}}, ids)
ids = trie.Get([]byte("badger"))
require.Equal(t, 1, len(ids))
require.Equal(t, map[uint64]struct{}{30: {}}, ids)
require.Equal(t, 2, len(ids))
require.Equal(t, map[uint64]struct{}{10: {}, 30: {}}, ids)

ids = trie.Get([]byte("hello"))
require.Equal(t, 4, len(ids))
require.Equal(t, map[uint64]struct{}{1: {}, 3: {}, 4: {}, 20: {}}, ids)
require.Equal(t, 5, len(ids))
require.Equal(t, map[uint64]struct{}{10: {}, 1: {}, 3: {}, 4: {}, 20: {}}, ids)

trie.Add([]byte{}, 11)
require.Equal(t, map[uint64]struct{}{10: {}, 11: {}}, trie.Get([]byte("A")))

}

func TestTrieDelete(t *testing.T) {
trie := NewTrie()
trie.Add([]byte("hello"), 1)
trie.Add([]byte("hello"), 3)
trie.Add([]byte("hello"), 4)
trie.Add(nil, 5)

trie.Delete([]byte("hello"), 4)

require.Equal(t, map[uint64]struct{}{5: {}, 1: {}, 3: {}}, trie.Get([]byte("hello")))

trie.Delete(nil, 5)
require.Equal(t, map[uint64]struct{}{1: {}, 3: {}}, trie.Get([]byte("hello")))
}

0 comments on commit c6c1e5e

Please sign in to comment.