Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Datastore based pinner (#4)
Browse files Browse the repository at this point in the history
feat: store pins in datastore instead of a DAG

Adds a new `/pins` namespace to the given datastore and uses that to store pins as cbor binary, keyed by unique pin ID.

The new datastore pinner stores pins in the datastore as individual key-value items. This is faster than the dag pinner, which stored all pins in a single dag that had to be rewritten every time a pin was added or removed.

The new pinner provides a secondary indexing mechanism that can be used to index any data that a pin has. Secondary indexing logic is provided by the `dsindex` package. The new pinner currently includes indexing by CID.

Both the new datastore pinner (`dspinner` package) and the old dag pinner (`ipldpinner` package) implementations are included to support migration between the two.  Migration logic is provided by the `pinconv` package.

Other features in new pinner:
- Benchmarks are provided to compare performance of between the old and new pinners
- New pinner does not keep in-memory set of pinned CIDs, instead it relies on the datastore
- Separate recursive and direct CID indexes allow searching for pins without having to load pin data to check the mode
- New pinner can rebuild indexes on load, if saved pins appear out of sync with the indexes
  • Loading branch information
gammazero committed Nov 30, 2020
1 parent 9e800d1 commit 4c92071
Show file tree
Hide file tree
Showing 15 changed files with 3,635 additions and 540 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*~
*.log

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool
*.out
8 changes: 8 additions & 0 deletions dsindex/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dsindex

import "errors"

var (
ErrEmptyKey = errors.New("key is empty")
ErrEmptyValue = errors.New("value is empty")
)
285 changes: 285 additions & 0 deletions dsindex/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
// Package dsindex provides secondary indexing functionality for a datastore.
package dsindex

import (
"context"
"fmt"
"path"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-multibase"
)

// Indexer maintains a secondary index. An index is a collection of key-value
// mappings where the key is the secondary index that maps to one or more
// values, where each value is a unique key being indexed.
type Indexer interface {
// Add adds the specified value to the key
Add(ctx context.Context, key, value string) error

// Delete deletes the specified value from the key. If the value is not in
// the datastore, this method returns no error.
Delete(ctx context.Context, key, value string) error

// DeleteKey deletes all values in the given key. If a key is not in the
// datastore, this method returns no error. Returns a count of values that
// were deleted.
DeleteKey(ctx context.Context, key string) (count int, err error)

// DeleteAll deletes all keys managed by this Indexer. Returns a count of
// the values that were deleted.
DeleteAll(ctx context.Context) (count int, err error)

// ForEach calls the function for each value in the specified key, until
// there are no more values, or until the function returns false. If key
// is empty string, then all keys are iterated.
ForEach(ctx context.Context, key string, fn func(key, value string) bool) error

// HasValue determines if the key contains the specified value
HasValue(ctx context.Context, key, value string) (bool, error)

// HasAny determines if any value is in the specified key. If key is
// empty string, then all values are searched.
HasAny(ctx context.Context, key string) (bool, error)

// Search returns all values for the given key
Search(ctx context.Context, key string) (values []string, err error)
}

// indexer is a simple implementation of Indexer. This implementation relies
// on the underlying data store to support efficient querying by prefix.
//
// TODO: Consider adding caching
type indexer struct {
dstore ds.Datastore
}

// New creates a new datastore index. All indexes are stored under the
// specified index name.
//
// To persist the actions of calling Indexer functions, it is necessary to call
// dstore.Sync.
func New(dstore ds.Datastore, name ds.Key) Indexer {
return &indexer{
dstore: namespace.Wrap(dstore, name),
}
}

func (x *indexer) Add(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
dsKey := ds.NewKey(encode(key)).ChildString(encode(value))
return x.dstore.Put(dsKey, []byte{})
}

func (x *indexer) Delete(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
return x.dstore.Delete(ds.NewKey(encode(key)).ChildString(encode(value)))
}

func (x *indexer) DeleteKey(ctx context.Context, key string) (int, error) {
if key == "" {
return 0, ErrEmptyKey
}
return x.deletePrefix(ctx, encode(key))
}

func (x *indexer) DeleteAll(ctx context.Context) (int, error) {
return x.deletePrefix(ctx, "")
}

func (x *indexer) ForEach(ctx context.Context, key string, fn func(key, value string) bool) error {
if key != "" {
key = encode(key)
}

q := query.Query{
Prefix: key,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return err
}

for {
r, ok := results.NextSync()
if !ok {
break
}
if r.Error != nil {
err = r.Error
break
}
if ctx.Err() != nil {
err = ctx.Err()
break
}
ent := r.Entry
decIdx, err := decode(path.Base(path.Dir(ent.Key)))
if err != nil {
err = fmt.Errorf("cannot decode index: %v", err)
break
}
decKey, err := decode(path.Base(ent.Key))
if err != nil {
err = fmt.Errorf("cannot decode key: %v", err)
break
}
if !fn(decIdx, decKey) {
break
}
}
results.Close()

return err
}

func (x *indexer) HasValue(ctx context.Context, key, value string) (bool, error) {
if key == "" {
return false, ErrEmptyKey
}
if value == "" {
return false, ErrEmptyValue
}
return x.dstore.Has(ds.NewKey(encode(key)).ChildString(encode(value)))
}

func (x *indexer) HasAny(ctx context.Context, key string) (bool, error) {
var any bool
err := x.ForEach(ctx, key, func(key, value string) bool {
any = true
return false
})
return any, err
}

func (x *indexer) Search(ctx context.Context, key string) ([]string, error) {
if key == "" {
return nil, ErrEmptyKey
}
ents, err := x.queryPrefix(ctx, encode(key))
if err != nil {
return nil, err
}
if len(ents) == 0 {
return nil, nil
}

values := make([]string, len(ents))
for i := range ents {
values[i], err = decode(path.Base(ents[i].Key))
if err != nil {
return nil, fmt.Errorf("cannot decode value: %v", err)
}
}
return values, nil
}

// SyncIndex synchronizes the keys in the target Indexer to match those of the
// ref Indexer. This function does not change this indexer's key root (name
// passed into New).
func SyncIndex(ctx context.Context, ref, target Indexer) (bool, error) {
// Build reference index map
refs := map[string]string{}
err := ref.ForEach(ctx, "", func(key, value string) bool {
refs[value] = key
return true
})
if err != nil {
return false, err
}
if len(refs) == 0 {
return false, nil
}

// Compare current indexes
dels := map[string]string{}
err = target.ForEach(ctx, "", func(key, value string) bool {
refKey, ok := refs[value]
if ok && refKey == key {
// same in both; delete from refs, do not add to dels
delete(refs, value)
} else {
dels[value] = key
}
return true
})
if err != nil {
return false, err
}

// Items in dels are keys that no longer exist
for value, key := range dels {
err = target.Delete(ctx, key, value)
if err != nil {
return false, err
}
}

// What remains in refs are keys that need to be added
for value, key := range refs {
err = target.Add(ctx, key, value)
if err != nil {
return false, err
}
}

return len(refs) != 0 || len(dels) != 0, nil
}

func (x *indexer) deletePrefix(ctx context.Context, prefix string) (int, error) {
ents, err := x.queryPrefix(ctx, prefix)
if err != nil {
return 0, err
}

for i := range ents {
err = x.dstore.Delete(ds.NewKey(ents[i].Key))
if err != nil {
return 0, err
}
}

return len(ents), nil
}

func (x *indexer) queryPrefix(ctx context.Context, prefix string) ([]query.Entry, error) {
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return nil, err
}
return results.Rest()
}

func encode(data string) string {
encData, err := multibase.Encode(multibase.Base64url, []byte(data))
if err != nil {
// programming error; using unsupported encoding
panic(err.Error())
}
return encData
}

func decode(data string) (string, error) {
_, b, err := multibase.Decode(data)
if err != nil {
return "", err
}
return string(b), nil
}
Loading

0 comments on commit 4c92071

Please sign in to comment.