diff --git a/.travis.yml b/.travis.yml index 2a62552..d0836de 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.12.x + - 1.13.x env: global: @@ -22,3 +22,8 @@ script: after_success: - bash <(curl -s https://codecov.io/bash) + + cache: + directories: + - $GOPATH/pkg/mod + - $HOME/.cache/go-build \ No newline at end of file diff --git a/datastore.go b/datastore.go index c9caa54..3b040a3 100644 --- a/datastore.go +++ b/datastore.go @@ -1,10 +1,9 @@ package dsbbolt import ( + "log" "os" - "bytes" - "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" "go.etcd.io/bbolt" @@ -19,13 +18,17 @@ var ( // Datastore implements an ipfs datastore // backed by a bbolt db type Datastore struct { - db *bbolt.DB - bucket []byte + db *bbolt.DB + bucket []byte + withSync bool } // NewDatastore is used to instantiate our datastore func NewDatastore(path string, opts *bbolt.Options, bucket []byte) (*Datastore, error) { - db, err := bbolt.Open(path, os.FileMode(0640), nil) + if opts == nil { + opts = bbolt.DefaultOptions + } + db, err := bbolt.Open(path, os.FileMode(0640), opts) if err != nil { return nil, err } @@ -38,7 +41,7 @@ func NewDatastore(path string, opts *bbolt.Options, bucket []byte) (*Datastore, }); err != nil { return nil, err } - return &Datastore{db, bucket}, nil + return &Datastore{db, bucket, !opts.NoSync}, nil } // Put is used to store something in our underlying datastore @@ -59,7 +62,13 @@ func (d *Datastore) Delete(key datastore.Key) error { func (d *Datastore) Get(key datastore.Key) ([]byte, error) { var data []byte if err := d.db.View(func(tx *bbolt.Tx) error { - data = tx.Bucket(d.bucket).Get(key.Bytes()) + // taken from https://github.com/ipfs/go-ds-bolt/blob/master/datastore.go#L54 + value := tx.Bucket(d.bucket).Get(key.Bytes()) + if value == nil { + return datastore.ErrNotFound + } + data = make([]byte, len(value)) + copy(data, value) return nil }); err != nil { return nil, err @@ -69,11 +78,7 @@ func (d *Datastore) Get(key datastore.Key) ([]byte, error) { // Has returns whether the key is present in our datastore func (d *Datastore) Has(key datastore.Key) (bool, error) { - data, err := d.Get(key) - if err != nil { - return false, err - } - return data != nil, nil + return datastore.GetBackedHas(d, key) } // GetSize returns the size of the value referenced by key @@ -86,47 +91,187 @@ func (d *Datastore) GetSize(key datastore.Key) (int, error) { // https://github.com/ipfs/go-datastore/blob/aa9190c18f1576be98e974359fd08c64ca0b5a94/examples/fs.go#L96 // https://github.com/etcd-io/bbolt#prefix-scans func (d *Datastore) Query(q query.Query) (query.Results, error) { - var entries []query.Entry - if err := d.db.View(func(tx *bbolt.Tx) error { - cursor := tx.Bucket(d.bucket).Cursor() - if q.Prefix == "" { - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - var entry query.Entry - entry.Key = string(k) + var ( + orders = q.Orders + done = make(chan bool) + resultChan = make(chan query.Result) + ) + log.Printf("%+v\n", q) + if len(orders) > 0 { + switch q.Orders[0].(type) { + case query.OrderByKey, *query.OrderByKey: + // already ordered by key + orders = nil + } + } + go func() { + defer func() { + done <- true + }() + // do some cool search shit here boys + d.db.View(func(tx *bbolt.Tx) error { + var ( + buck = tx.Bucket(d.bucket) + c = buck.Cursor() + prefix []byte + ) + if q.Prefix != "" { + prefix = []byte(q.Prefix) + } + // handle my sortiness and collect all results up-front + if len(orders) > 0 { + var entries []query.Entry + // query and filter + for k, v := c.Seek(prefix); k != nil; k, v = c.Next() { + dk := datastore.NewKey(string(k)).String() + e := query.Entry{Key: dk} + + if !q.KeysOnly { + // copy afer filtering/sorting + e.Value = v + e.Size = len(e.Value) + } + if filter(q.Filters, e) { + continue + } + entries = append(entries, e) + } + // sort + query.Sort(orders, entries) + // offset/limit + if len(entries) >= q.Offset { + entries = entries[q.Offset:] + } + if q.Limit > 0 && q.Limit < len(entries) { + entries = entries[:q.Limit] + } + /* this is causing issues for some reason + // offset/limit + entries = entries[qrb.Query.Offset:] + if qrb.Query.Limit > 0 { + if qrb.Query.Limit < len(entries) { + entries = entries[:qrb.Query.Limit] + } + } + */ + // send + for _, e := range entries { + // copy late so we don't have to copy values we dont use + e.Value = append(e.Value[0:0:0], e.Value...) + select { + case resultChan <- query.Result{Entry: e}: + // TODO(bonedaddy): we might need to re-enable if this blocks + // default: + } + } + return nil + } + // Otherwise, send results as we get them. + offset := 0 + for k, v := c.Seek(prefix); k != nil; k, v = c.Next() { + dk := datastore.NewKey(string(k)).String() + e := query.Entry{Key: dk, Value: v} if !q.KeysOnly { - entry.Value = v - entry.Size = int(len(entry.Value)) + // We copy _after_ filtering. + e.Value = v + e.Size = len(e.Value) + } + // pre-filter + if filter(q.Filters, e) { + continue + } + // now count this item towards the results + offset++ + // check the offset + if offset < q.Offset { + continue + } + e.Value = append(e.Value[0:0:0], e.Value...) + select { + case resultChan <- query.Result{Entry: e}: + offset++ + // TODO(bonedaddy): we might need to re-enable if this blocks + // default: + } + if q.Limit > 0 && offset >= (q.Offset+q.Limit) { + // all done. + return nil } - entries = append(entries, entry) } return nil - } - pref := []byte(q.Prefix) - for k, v := cursor.Seek(pref); k != nil && bytes.HasPrefix(k, pref); k, v = cursor.Next() { - var entry query.Entry - entry.Key = string(k) - if !q.KeysOnly { - entry.Value = v + }) + }() + var entries []query.Entry + for { + select { + case <-done: + goto FINISHED + case result := <-resultChan: + if result.Error != nil { + log.Println("query result failure: ", result.Error) } - entries = append(entries, entry) + entries = append(entries, result.Entry) } + } +FINISHED: + return query.ResultsWithEntries(q, entries), nil +} + +// Sync is used to manually trigger syncing db contents to disk. +// This call is only usable when synchronous writes aren't enabled +func (d *Datastore) Sync(prefix datastore.Key) error { + if d.withSync { return nil - }); err != nil { - return nil, err } - results := query.ResultsWithEntries(q, entries) - // close the result builder since we are done using it - return results, nil + return d.db.Sync() } // Batch returns a basic batched bolt datastore wrapper // it is a temporary method until we implement a proper // transactional batched datastore func (d *Datastore) Batch() (datastore.Batch, error) { - return datastore.NewBasicBatch(d), nil + tx, err := d.db.Begin(true) + if err != nil { + return nil, err + } + return &bboltBatch{ + tx: tx, + bkt: tx.Bucket(d.bucket), + }, nil } // Close is used to close the underlying datastore func (d *Datastore) Close() error { return d.db.Close() } + +// implements batching capabilities +type bboltBatch struct { + tx *bbolt.Tx + bkt *bbolt.Bucket +} + +// Commit the underlying batched transactions +func (bb *bboltBatch) Commit() error { + return bb.tx.Commit() +} + +// Add delete operation to the batch +func (bb *bboltBatch) Delete(key datastore.Key) error { + return bb.bkt.Delete(key.Bytes()) +} + +// Add a put operation to the batch +func (bb *bboltBatch) Put(key datastore.Key, val []byte) error { + return bb.bkt.Put(key.Bytes(), val) +} + +// filter checks if we should filter out the query. +func filter(filters []query.Filter, entry query.Entry) bool { + for _, filter := range filters { + if !filter.Filter(entry) { + return true + } + } + return false +} diff --git a/datastore_test.go b/datastore_test.go index 506d8a2..b33ac35 100644 --- a/datastore_test.go +++ b/datastore_test.go @@ -2,12 +2,15 @@ package dsbbolt import ( "fmt" + "os" "testing" "reflect" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" + dstest "github.com/ipfs/go-datastore/test" + "go.etcd.io/bbolt" ) func Test_NewDatastore(t *testing.T) { @@ -35,6 +38,60 @@ func Test_NewDatastore(t *testing.T) { } } +func Test_Batch(t *testing.T) { + defer os.RemoveAll("./tmp") + ds, err := NewDatastore("./tmp", nil, nil) + if err != nil { + t.Fatal(err) + } + batcher, err := ds.Batch() + if err != nil { + t.Fatal(err) + } + if err := batcher.Put(datastore.NewKey("helloworld"), []byte("hello")); err != nil { + t.Fatal(err) + } + if err := batcher.Delete(datastore.NewKey("helloworld")); err != nil { + t.Fatal(err) + } + batcher.Commit() + if _, err := ds.Get(datastore.NewKey("helloworld")); err == nil { + t.Fatal("error expected") + } +} + +func Test_Sync(t *testing.T) { + type args struct { + sync bool + } + tests := []struct { + name string + args args + }{ + {"With-Sync", args{true}}, + {"Without-Sync", args{false}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer os.RemoveAll("./tmp") + opts := bbolt.DefaultOptions + // we want the "inverse" of sync because this is "no sync" + // that is, if we specify sync, we want to say "no thank you nosync" + opts.NoSync = !tt.args.sync + ds, err := NewDatastore("./tmp", opts, nil) + if err != nil { + t.Fatal(err) + } + if ds.withSync != tt.args.sync { + t.Fatal("bad sync status") + } + if err := ds.Sync(datastore.NewKey("hmm")); err != nil { + t.Fatal(err) + } + }) + } +} + func Test_Datastore(t *testing.T) { ds, err := NewDatastore("./tmp", nil, nil) if err != nil { @@ -121,3 +178,13 @@ func Test_Datastore(t *testing.T) { } } + +func TestSuite(t *testing.T) { + defer os.RemoveAll("./tmp") + db, err := NewDatastore("./tmp", nil, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + dstest.SubtestAll(t, db) +} diff --git a/go.mod b/go.mod index f2c2440..36898f9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/RTradeLtd/go-ds-bbolt go 1.12 require ( - github.com/ipfs/go-datastore v0.2.0 + github.com/ipfs/go-datastore v0.3.1 go.etcd.io/bbolt v1.3.3 golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9 // indirect ) diff --git a/go.sum b/go.sum index 7a5153a..819f5f8 100644 --- a/go.sum +++ b/go.sum @@ -2,14 +2,8 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJY github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/ipfs/go-datastore v0.0.5 h1:q3OfiOZV5rlsK1H5V8benjeUApRfMGs4Mrhmr6NriQo= -github.com/ipfs/go-datastore v0.0.5/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= -github.com/ipfs/go-datastore v0.1.0 h1:TOxI04l8CmO4zGtesENhzm4PwkFwJXY3rKiYaaMf9fI= -github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= -github.com/ipfs/go-datastore v0.1.1 h1:F4k0TkTAZGLFzBOrVKDAvch6JZtuN4NHkfdcEZL50aI= -github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= -github.com/ipfs/go-datastore v0.2.0 h1:5Wjw6YXzZmtqU1MSrlws64+oLmSqea7gEajTcJickh8= -github.com/ipfs/go-datastore v0.2.0/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= +github.com/ipfs/go-datastore v0.3.1 h1:SS1t869a6cctoSYmZXUk8eL6AzVXgASmKIWFNQkQ1jU= +github.com/ipfs/go-datastore v0.3.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= @@ -23,4 +17,5 @@ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9 h1:ZBzSG/7F4eNKz2L3GE9o300RX0Az1Bw5HF7PDraD+qU= golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=