Skip to content

Commit

Permalink
Fix Prefix and LowerBound on non-unique indexes
Browse files Browse the repository at this point in the history
Prefix and LowerBound searches did not properly deal with non-unique
indexes. In these indexes the keys are encoded as <secondary><primary><secondary len>,
and prefix searching needs to make sure the secondary key length on the results
is equal or longer than the search key as otherwise we might match into the primary
key.

For example if the object is struct{A, B string}, with A being primary and we have
{"a", "a"}, then the secondary index is key'd as "aa<1>". A prefix search with "aa"
must not match since the secondary index contains only an object with key "a".

Fix this by using a special iteration on non-unique indexes that checks the length
of the secondary key and ignores any other matches that are due to matching into the
primary key.

Another issue with non-unique indexes was due to having no separator between the
secondary and primary key, leading to the primary key having an effect on the iteration
order. Fix this by adding '\0' as a separator.

Signed-off-by: Jussi Maki <jussi.maki@isovalent.com>
  • Loading branch information
joamaki committed Oct 4, 2024
1 parent 3ff3512 commit cf6e1bb
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 33 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ var TagsIndex = statedb.Index[*MyObject, Tag]{
}
```

Please note that since in non-unique indexes we may have multiple keys all pointing
to the same object you may see duplicates when Prefix or LowerBound searching it!

With the indexes now defined, we can construct a table.

### Setting up a table
Expand Down Expand Up @@ -365,6 +368,8 @@ for obj, revision := range objs { ... }
```go
// Prefix does a prefix search on an index. Here it returns an iterator
// for all objects that have a tag that starts with "h".
// NOTE: If object has multiple tags that start with "h" this will return
// that object multiple times!
objs, watch = myObjects.Prefix(txn, TagsIndex.Query("h"))
for obj := range objs {
...
Expand Down
12 changes: 9 additions & 3 deletions any_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type AnyTable struct {

func (t AnyTable) All(txn ReadTxn) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
return anySeq(indexTxn.Iterator())
return partSeq[any](indexTxn.Iterator())
}

func (t AnyTable) UnmarshalYAML(data []byte) (any, error) {
Expand Down Expand Up @@ -41,13 +41,19 @@ func (t AnyTable) Delete(txn WriteTxn, obj any) (old any, hadOld bool, err error
func (t AnyTable) Prefix(txn ReadTxn, key string) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
iter, _ := indexTxn.Prefix([]byte(key))
return anySeq(iter)
if indexTxn.unique {
return partSeq[any](iter)
}
return nonUniqueSeq[any](iter, false, []byte(key))
}

func (t AnyTable) LowerBound(txn ReadTxn, key string) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
iter := indexTxn.LowerBound([]byte(key))
return anySeq(iter)
if indexTxn.unique {
return partSeq[any](iter)
}
return nonUniqueLowerBoundSeq[any](iter, []byte(key))
}

func (t AnyTable) TableHeader() []string {
Expand Down
60 changes: 38 additions & 22 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package statedb

import (
"bytes"
"fmt"
"iter"
"slices"
Expand Down Expand Up @@ -71,24 +72,6 @@ func partSeq[Obj any](iter *part.Iterator[object]) iter.Seq2[Obj, Revision] {
}
}

// anySeq returns a sequence of objects from a part Iterator.
func anySeq(iter *part.Iterator[object]) iter.Seq2[any, Revision] {
return func(yield func(any, Revision) bool) {
// Iterate over a clone of the original iterator to allow the sequence to be iterated
// from scratch multiple times.
it := iter.Clone()
for {
_, iobj, ok := it.Next()
if !ok {
break
}
if !yield(iobj.data, iobj.revision) {
break
}
}
}
}

// nonUniqueSeq returns a sequence of objects for a non-unique index.
// Non-unique indexes work by concatenating the secondary key with the
// primary key and then prefix searching for the items:
Expand All @@ -107,7 +90,7 @@ func anySeq(iter *part.Iterator[object]) iter.Seq2[any, Revision] {
// aaaa_ccc4
//
// We yield "aaaa_bbb4", skip "aaa_abab3" and yield "aaaa_ccc4".
func nonUniqueSeq[Obj any](iter *part.Iterator[object], searchKey []byte) iter.Seq2[Obj, Revision] {
func nonUniqueSeq[Obj any](iter *part.Iterator[object], exact bool, searchKey []byte) iter.Seq2[Obj, Revision] {
return func(yield func(Obj, Revision) bool) {
// Clone the iterator to allow multiple iterations over the sequence.
it := iter.Clone()
Expand All @@ -119,9 +102,13 @@ func nonUniqueSeq[Obj any](iter *part.Iterator[object], searchKey []byte) iter.S

_, secondary := decodeNonUniqueKey(key)

// The secondary key doesn't match the search key. Since the primary
// key length can vary, we need to continue the prefix search.
if len(secondary) != len(searchKey) {
// The secondary key is shorter than what we're looking for, e.g.
// we match into the primary key. Keep searching for matching secondary
// keys.
switch {
case exact && len(secondary) != len(searchKey):
continue
case !exact && len(secondary) < len(searchKey):
continue
}

Expand All @@ -132,6 +119,35 @@ func nonUniqueSeq[Obj any](iter *part.Iterator[object], searchKey []byte) iter.S
}
}

func nonUniqueLowerBoundSeq[Obj any](iter *part.Iterator[object], searchKey []byte) iter.Seq2[Obj, Revision] {
return func(yield func(Obj, Revision) bool) {
// Clone the iterator to allow multiple uses.
iter = iter.Clone()
matched := false
for {
key, iobj, ok := iter.Next()
if !ok {
break
}
if !matched {
// With a non-unique index we have a composite key <secondary><primary><secondary len>
// and the part LowerBound() search may seek us to a key

// Non-unique index, seek until we find athe a match on the secondary key.
// (otherwise we might match partially into the primary key)
_, secondary := decodeNonUniqueKey(key)
if bytes.Compare(secondary, searchKey) < 0 {
continue
}
matched = true
}
if !yield(iobj.data.(Obj), iobj.revision) {
return
}
}
}
}

// iterator adapts the "any" object iterator to a typed object.
type iterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
Expand Down
12 changes: 9 additions & 3 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ func (t *genTable[Obj]) LowerBoundWatch(txn ReadTxn, q Query[Obj]) (iter.Seq2[Ob
// we watch the whole table for changes.
watch := indexTxn.RootWatch()
iter := indexTxn.LowerBound(q.key)
return partSeq[Obj](iter), watch
if indexTxn.unique {
return partSeq[Obj](iter), watch
}
return nonUniqueLowerBoundSeq[Obj](iter, q.key), watch
}

func (t *genTable[Obj]) Prefix(txn ReadTxn, q Query[Obj]) iter.Seq2[Obj, Revision] {
Expand All @@ -329,7 +332,10 @@ func (t *genTable[Obj]) Prefix(txn ReadTxn, q Query[Obj]) iter.Seq2[Obj, Revisio
func (t *genTable[Obj]) PrefixWatch(txn ReadTxn, q Query[Obj]) (iter.Seq2[Obj, Revision], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index))
iter, watch := indexTxn.Prefix(q.key)
return partSeq[Obj](iter), watch
if indexTxn.unique {
return partSeq[Obj](iter), watch
}
return nonUniqueSeq[Obj](iter, false, q.key), watch
}

func (t *genTable[Obj]) All(txn ReadTxn) iter.Seq2[Obj, Revision] {
Expand Down Expand Up @@ -366,7 +372,7 @@ func (t *genTable[Obj]) ListWatch(txn ReadTxn, q Query[Obj]) (iter.Seq2[Obj, Rev
// iteration will continue until key length mismatches, e.g. we hit a
// longer key sharing the same prefix.
iter, watch := indexTxn.Prefix(q.key)
return nonUniqueSeq[Obj](iter, q.key), watch
return nonUniqueSeq[Obj](iter, true, q.key), watch
}

func (t *genTable[Obj]) Insert(txn WriteTxn, obj Obj) (oldObj Obj, hadOld bool, err error) {
Expand Down
11 changes: 6 additions & 5 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,28 +338,29 @@ func (txn *txn) delete(meta TableMeta, guardRevision Revision, data any) (object

// encodeNonUniqueKey constructs the internal key to use with non-unique indexes.
// It concatenates the secondary key with the primary key and the length of the secondary key.
// A zero byte between the keys ensures proper ordering.
// The length is stored as unsigned 16-bit big endian.
// This allows looking up from the non-unique index with the secondary key by doing a prefix
// search. The length is used to safe-guard against indexers that don't terminate the key
// properly (e.g. if secondary key is "foo", then we don't want "foobar" to match).
func encodeNonUniqueKey(primary, secondary index.Key) []byte {
key := make([]byte, 0, len(secondary)+len(primary)+2)
key = append(key, secondary...)
key = append(key, primary...)
key := make([]byte, len(secondary)+1+len(primary), len(secondary)+1+len(primary)+2)
copy(key, secondary)
copy(key[1+len(secondary):], primary)
// KeySet limits size of key to 16 bits.
return binary.BigEndian.AppendUint16(key, uint16(len(secondary)))
}

func decodeNonUniqueKey(key []byte) (primary []byte, secondary []byte) {
// Multi-index key is [<secondary...>, <primary...>, <secondary length>]
// Non-unique key is [<secondary...>, '\0', <primary...>, <secondary length>]
if len(key) < 2 {
return nil, nil
}
secondaryLength := int(binary.BigEndian.Uint16(key[len(key)-2:]))
if len(key) < secondaryLength {
return nil, nil
}
return key[secondaryLength : len(key)-2], key[:secondaryLength]
return key[secondaryLength+1 : len(key)-2], key[:secondaryLength]
}

func (txn *txn) Abort() {
Expand Down
4 changes: 4 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Table[Obj any] interface {

// LowerBound returns an iterator for objects that have a key
// greater or equal to the query.
// NOTE: LowerBound searching a non-unique index may yield duplicate results as one
// object may be pointed to by multiple keys.
LowerBound(ReadTxn, Query[Obj]) iter.Seq2[Obj, Revision]

// LowerBoundWatch returns an iterator for objects that have a key
Expand All @@ -61,6 +63,8 @@ type Table[Obj any] interface {
LowerBoundWatch(ReadTxn, Query[Obj]) (seq iter.Seq2[Obj, Revision], watch <-chan struct{})

// Prefix searches the table by key prefix.
// NOTE: Prefix searching a non-unique index may yield duplicate results as one
// object may be pointed to by multiple keys.
Prefix(ReadTxn, Query[Obj]) iter.Seq2[Obj, Revision]

// PrefixWatch searches the table by key prefix. Returns an iterator and a watch
Expand Down

0 comments on commit cf6e1bb

Please sign in to comment.