Skip to content

Commit

Permalink
Merge pull request #42 from ipfs/bug/race-close
Browse files Browse the repository at this point in the history
prevent closing concurrently with other operations.
  • Loading branch information
willscott authored Feb 25, 2020
2 parents 2e5c197 + 5d92b4d commit 8e15c83
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 6 deletions.
43 changes: 37 additions & 6 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package leveldb
import (
"os"
"path/filepath"
"sync"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -52,11 +53,12 @@ func NewDatastore(path string, opts *Options) (*Datastore, error) {
return nil, err
}

return &Datastore{
accessor: &accessor{ldb: db, syncWrites: true},
ds := Datastore{
accessor: &accessor{ldb: db, syncWrites: true, closeLk: new(sync.RWMutex)},
DB: db,
path: path,
}, nil
}
return &ds, nil
}

// An extraction of the common interface between LevelDB Transactions and the DB itself.
Expand All @@ -74,9 +76,12 @@ type levelDbOps interface {
type accessor struct {
ldb levelDbOps
syncWrites bool
closeLk *sync.RWMutex
}

func (a *accessor) Put(key ds.Key, value []byte) (err error) {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
return a.ldb.Put(key.Bytes(), value, &opt.WriteOptions{Sync: a.syncWrites})
}

Expand All @@ -85,6 +90,8 @@ func (a *accessor) Sync(prefix ds.Key) error {
}

func (a *accessor) Get(key ds.Key) (value []byte, err error) {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
val, err := a.ldb.Get(key.Bytes(), nil)
if err != nil {
if err == leveldb.ErrNotFound {
Expand All @@ -96,18 +103,24 @@ func (a *accessor) Get(key ds.Key) (value []byte, err error) {
}

func (a *accessor) Has(key ds.Key) (exists bool, err error) {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
return a.ldb.Has(key.Bytes(), nil)
}

func (d *accessor) GetSize(key ds.Key) (size int, err error) {
return ds.GetBackedSize(d, key)
func (a *accessor) GetSize(key ds.Key) (size int, err error) {
return ds.GetBackedSize(a, key)
}

func (a *accessor) Delete(key ds.Key) (err error) {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
return a.ldb.Delete(key.Bytes(), &opt.WriteOptions{Sync: a.syncWrites})
}

func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
var rnge *util.Range

// make a copy of the query for the fallback naive query implementation.
Expand Down Expand Up @@ -135,6 +148,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
}
r := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
if !next() {
return dsq.Result{}, false
}
Expand All @@ -149,6 +164,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
return dsq.Result{Entry: e}, true
},
Close: func() error {
a.closeLk.RLock()
defer a.closeLk.RUnlock()
i.Release()
return nil
},
Expand All @@ -159,6 +176,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
// DiskUsage returns the current disk size used by this levelDB.
// For in-mem datastores, it will return 0.
func (d *Datastore) DiskUsage() (uint64, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.path == "" { // in-mem
return 0, nil
}
Expand All @@ -182,19 +201,23 @@ func (d *Datastore) DiskUsage() (uint64, error) {

// LevelDB needs to be closed.
func (d *Datastore) Close() (err error) {
d.closeLk.Lock()
defer d.closeLk.Unlock()
return d.DB.Close()
}

type leveldbBatch struct {
b *leveldb.Batch
db *leveldb.DB
closeLk *sync.RWMutex
syncWrites bool
}

func (d *Datastore) Batch() (ds.Batch, error) {
return &leveldbBatch{
b: new(leveldb.Batch),
db: d.DB,
closeLk: d.closeLk,
syncWrites: d.syncWrites,
}, nil
}
Expand All @@ -205,6 +228,8 @@ func (b *leveldbBatch) Put(key ds.Key, value []byte) error {
}

func (b *leveldbBatch) Commit() error {
b.closeLk.RLock()
defer b.closeLk.RUnlock()
return b.db.Write(b.b, &opt.WriteOptions{Sync: b.syncWrites})
}

Expand All @@ -220,18 +245,24 @@ type transaction struct {
}

func (t *transaction) Commit() error {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
return t.tx.Commit()
}

func (t *transaction) Discard() {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
t.tx.Discard()
}

func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
tx, err := d.DB.OpenTransaction()
if err != nil {
return nil, err
}
accessor := &accessor{ldb: tx, syncWrites: false}
accessor := &accessor{ldb: tx, syncWrites: false, closeLk: d.closeLk}
return &transaction{accessor, tx}, nil
}
38 changes: 38 additions & 0 deletions ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,44 @@ func TestQueryRespectsProcess(t *testing.T) {
addTestCases(t, d, testcases)
}

func TestCloseRace(t *testing.T) {
d, close := newDS(t)
for n := 0; n < 100; n++ {
d.Put(ds.NewKey(fmt.Sprintf("%d", n)), []byte(fmt.Sprintf("test%d", n)))
}

tx, _ := d.NewTransaction(false)
tx.Put(ds.NewKey("txnversion"), []byte("bump"))

closeCh := make(chan interface{})

go func() {
close()
closeCh <- nil
}()
for k := range testcases {
tx.Get(ds.NewKey(k))
}
tx.Commit()
<-closeCh
}

func TestCloseSafety(t *testing.T) {
d, close := newDS(t)
addTestCases(t, d, testcases)

tx, _ := d.NewTransaction(false)
err := tx.Put(ds.NewKey("test"), []byte("test"))
if err != nil {
t.Error("Failed to put in a txn.")
}
close()
err = tx.Commit()
if err == nil {
t.Error("committing after close should fail.")
}
}

func TestQueryRespectsProcessMem(t *testing.T) {
d := newDSMem(t)
addTestCases(t, d, testcases)
Expand Down

0 comments on commit 8e15c83

Please sign in to comment.