Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Datastore Interface #13

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- 1.12.x
- 1.13.x

env:
global:
Expand All @@ -22,3 +22,8 @@ script:

after_success:
- bash <(curl -s https://codecov.io/bash)

cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
217 changes: 181 additions & 36 deletions datastore.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package dsbbolt

import (
"log"
"os"

"bytes"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"go.etcd.io/bbolt"
Expand All @@ -19,13 +18,17 @@ var (
// Datastore implements an ipfs datastore
// backed by a bbolt db
type Datastore struct {
db *bbolt.DB
bucket []byte
db *bbolt.DB
bucket []byte
withSync bool
}

// NewDatastore is used to instantiate our datastore
func NewDatastore(path string, opts *bbolt.Options, bucket []byte) (*Datastore, error) {
db, err := bbolt.Open(path, os.FileMode(0640), nil)
if opts == nil {
opts = bbolt.DefaultOptions
}
db, err := bbolt.Open(path, os.FileMode(0640), opts)
if err != nil {
return nil, err
}
Expand All @@ -38,7 +41,7 @@ func NewDatastore(path string, opts *bbolt.Options, bucket []byte) (*Datastore,
}); err != nil {
return nil, err
}
return &Datastore{db, bucket}, nil
return &Datastore{db, bucket, !opts.NoSync}, nil
}

// Put is used to store something in our underlying datastore
Expand All @@ -59,7 +62,13 @@ func (d *Datastore) Delete(key datastore.Key) error {
func (d *Datastore) Get(key datastore.Key) ([]byte, error) {
var data []byte
if err := d.db.View(func(tx *bbolt.Tx) error {
data = tx.Bucket(d.bucket).Get(key.Bytes())
// taken from https://github.com/ipfs/go-ds-bolt/blob/master/datastore.go#L54
value := tx.Bucket(d.bucket).Get(key.Bytes())
if value == nil {
return datastore.ErrNotFound
}
data = make([]byte, len(value))
copy(data, value)
return nil
}); err != nil {
return nil, err
Expand All @@ -69,11 +78,7 @@ func (d *Datastore) Get(key datastore.Key) ([]byte, error) {

// Has returns whether the key is present in our datastore
func (d *Datastore) Has(key datastore.Key) (bool, error) {
data, err := d.Get(key)
if err != nil {
return false, err
}
return data != nil, nil
return datastore.GetBackedHas(d, key)
}

// GetSize returns the size of the value referenced by key
Expand All @@ -86,47 +91,187 @@ func (d *Datastore) GetSize(key datastore.Key) (int, error) {
// https://github.com/ipfs/go-datastore/blob/aa9190c18f1576be98e974359fd08c64ca0b5a94/examples/fs.go#L96
// https://github.com/etcd-io/bbolt#prefix-scans
func (d *Datastore) Query(q query.Query) (query.Results, error) {
var entries []query.Entry
if err := d.db.View(func(tx *bbolt.Tx) error {
cursor := tx.Bucket(d.bucket).Cursor()
if q.Prefix == "" {
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var entry query.Entry
entry.Key = string(k)
var (
orders = q.Orders
done = make(chan bool)
resultChan = make(chan query.Result)
)
log.Printf("%+v\n", q)
if len(orders) > 0 {
switch q.Orders[0].(type) {
case query.OrderByKey, *query.OrderByKey:
// already ordered by key
orders = nil
}
}
go func() {
defer func() {
done <- true
}()
// do some cool search shit here boys
d.db.View(func(tx *bbolt.Tx) error {
var (
buck = tx.Bucket(d.bucket)
c = buck.Cursor()
prefix []byte
)
if q.Prefix != "" {
prefix = []byte(q.Prefix)
}
// handle my sortiness and collect all results up-front
if len(orders) > 0 {
var entries []query.Entry
// query and filter
for k, v := c.Seek(prefix); k != nil; k, v = c.Next() {
dk := datastore.NewKey(string(k)).String()
e := query.Entry{Key: dk}

if !q.KeysOnly {
// copy afer filtering/sorting
e.Value = v
e.Size = len(e.Value)
}
if filter(q.Filters, e) {
continue
}
entries = append(entries, e)
}
// sort
query.Sort(orders, entries)
// offset/limit
if len(entries) >= q.Offset {
entries = entries[q.Offset:]
}
if q.Limit > 0 && q.Limit < len(entries) {
entries = entries[:q.Limit]
}
/* this is causing issues for some reason
// offset/limit
entries = entries[qrb.Query.Offset:]
if qrb.Query.Limit > 0 {
if qrb.Query.Limit < len(entries) {
entries = entries[:qrb.Query.Limit]
}
}
*/
// send
for _, e := range entries {
// copy late so we don't have to copy values we dont use
e.Value = append(e.Value[0:0:0], e.Value...)
select {
case resultChan <- query.Result{Entry: e}:
// TODO(bonedaddy): we might need to re-enable if this blocks
// default:
}
}
return nil
}
// Otherwise, send results as we get them.
offset := 0
for k, v := c.Seek(prefix); k != nil; k, v = c.Next() {
dk := datastore.NewKey(string(k)).String()
e := query.Entry{Key: dk, Value: v}
if !q.KeysOnly {
entry.Value = v
entry.Size = int(len(entry.Value))
// We copy _after_ filtering.
e.Value = v
e.Size = len(e.Value)
}
// pre-filter
if filter(q.Filters, e) {
continue
}
// now count this item towards the results
offset++
// check the offset
if offset < q.Offset {
continue
}
e.Value = append(e.Value[0:0:0], e.Value...)
select {
case resultChan <- query.Result{Entry: e}:
offset++
// TODO(bonedaddy): we might need to re-enable if this blocks
// default:
}
if q.Limit > 0 && offset >= (q.Offset+q.Limit) {
// all done.
return nil
}
entries = append(entries, entry)
}
return nil
}
pref := []byte(q.Prefix)
for k, v := cursor.Seek(pref); k != nil && bytes.HasPrefix(k, pref); k, v = cursor.Next() {
var entry query.Entry
entry.Key = string(k)
if !q.KeysOnly {
entry.Value = v
})
}()
var entries []query.Entry
for {
select {
case <-done:
goto FINISHED
case result := <-resultChan:
if result.Error != nil {
log.Println("query result failure: ", result.Error)
}
entries = append(entries, entry)
entries = append(entries, result.Entry)
}
}
FINISHED:
return query.ResultsWithEntries(q, entries), nil
}

// Sync is used to manually trigger syncing db contents to disk.
// This call is only usable when synchronous writes aren't enabled
func (d *Datastore) Sync(prefix datastore.Key) error {
if d.withSync {
return nil
}); err != nil {
return nil, err
}
results := query.ResultsWithEntries(q, entries)
// close the result builder since we are done using it
return results, nil
return d.db.Sync()
}

// Batch returns a basic batched bolt datastore wrapper
// it is a temporary method until we implement a proper
// transactional batched datastore
func (d *Datastore) Batch() (datastore.Batch, error) {
return datastore.NewBasicBatch(d), nil
tx, err := d.db.Begin(true)
if err != nil {
return nil, err
}
return &bboltBatch{
tx: tx,
bkt: tx.Bucket(d.bucket),
}, nil
}

// Close is used to close the underlying datastore
func (d *Datastore) Close() error {
return d.db.Close()
}

// implements batching capabilities
type bboltBatch struct {
tx *bbolt.Tx
bkt *bbolt.Bucket
}

// Commit the underlying batched transactions
func (bb *bboltBatch) Commit() error {
return bb.tx.Commit()
}

// Add delete operation to the batch
func (bb *bboltBatch) Delete(key datastore.Key) error {
return bb.bkt.Delete(key.Bytes())
}

// Add a put operation to the batch
func (bb *bboltBatch) Put(key datastore.Key, val []byte) error {
return bb.bkt.Put(key.Bytes(), val)
}

// filter checks if we should filter out the query.
func filter(filters []query.Filter, entry query.Entry) bool {
for _, filter := range filters {
if !filter.Filter(entry) {
return true
}
}
return false
}
67 changes: 67 additions & 0 deletions datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package dsbbolt

import (
"fmt"
"os"
"testing"

"reflect"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
"go.etcd.io/bbolt"
)

func Test_NewDatastore(t *testing.T) {
Expand Down Expand Up @@ -35,6 +38,60 @@ func Test_NewDatastore(t *testing.T) {
}
}

func Test_Batch(t *testing.T) {
defer os.RemoveAll("./tmp")
ds, err := NewDatastore("./tmp", nil, nil)
if err != nil {
t.Fatal(err)
}
batcher, err := ds.Batch()
if err != nil {
t.Fatal(err)
}
if err := batcher.Put(datastore.NewKey("helloworld"), []byte("hello")); err != nil {
t.Fatal(err)
}
if err := batcher.Delete(datastore.NewKey("helloworld")); err != nil {
t.Fatal(err)
}
batcher.Commit()
if _, err := ds.Get(datastore.NewKey("helloworld")); err == nil {
t.Fatal("error expected")
}
}

func Test_Sync(t *testing.T) {
type args struct {
sync bool
}
tests := []struct {
name string
args args
}{
{"With-Sync", args{true}},
{"Without-Sync", args{false}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer os.RemoveAll("./tmp")
opts := bbolt.DefaultOptions
// we want the "inverse" of sync because this is "no sync"
// that is, if we specify sync, we want to say "no thank you nosync"
opts.NoSync = !tt.args.sync
ds, err := NewDatastore("./tmp", opts, nil)
if err != nil {
t.Fatal(err)
}
if ds.withSync != tt.args.sync {
t.Fatal("bad sync status")
}
if err := ds.Sync(datastore.NewKey("hmm")); err != nil {
t.Fatal(err)
}
})
}
}

func Test_Datastore(t *testing.T) {
ds, err := NewDatastore("./tmp", nil, nil)
if err != nil {
Expand Down Expand Up @@ -121,3 +178,13 @@ func Test_Datastore(t *testing.T) {
}

}

func TestSuite(t *testing.T) {
defer os.RemoveAll("./tmp")
db, err := NewDatastore("./tmp", nil, nil)
if err != nil {
t.Fatal(err)
}
defer db.Close()
dstest.SubtestAll(t, db)
}
Loading