Skip to content

Commit

Permalink
Merge pull request #140 from ipfs/feat/async-ds
Browse files Browse the repository at this point in the history
Support Asynchronous Writing Datastores
  • Loading branch information
Stebalien authored Dec 3, 2019
2 parents d4417ca + 8ddf6ad commit 8b79466
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 5 deletions.
29 changes: 29 additions & 0 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ func (d *Datastore) Put(k ds.Key, val []byte) error {
return nil
}

// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
b, err := d.child.Batch()
if err != nil {
return err
}

for k, o := range d.buffer {
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
continue
}

var err error
if o.delete {
err = b.Delete(k)
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
}

delete(d.buffer, k)
}

return b.Commit()
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
Expand Down
146 changes: 146 additions & 0 deletions autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,149 @@ func TestFlushing(t *testing.T) {
t.Fatal("wrong value")
}
}

func TestSync(t *testing.T) {
child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)

put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
t.Fatal(err)
}
}
del := func(key ds.Key) {
if err := d.Delete(key); err != nil {
t.Fatal(err)
}
}

get := func(d ds.Datastore, key ds.Key) {
val, err := d.Get(key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(val, []byte(key.String())) {
t.Fatal("wrong value")
}
}
invalidGet := func(d ds.Datastore, key ds.Key) {
if _, err := d.Get(key); err != ds.ErrNotFound {
t.Fatal("should not have found value")
}
}

// Test if Syncing Puts works
internalSyncTest(t, d, child, put, del, get, invalidGet)

// Test if Syncing Deletes works
internalSyncTest(t, d, child, del, put, invalidGet, get)
}

// This function can be used to test Sync Puts and Deletes
// For clarity comments are written as if op = Put and undoOp = Delete
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {
var keys []ds.Key
keymap := make(map[ds.Key]int)
for i := 0; i < 4; i++ {
k := ds.NewKey(fmt.Sprintf("%d", i))
keymap[k] = len(keys)
keys = append(keys, k)
for j := 0; j < 2; j++ {
k := ds.NewKey(fmt.Sprintf("%d/%d", i, j))
keymap[k] = len(keys)
keys = append(keys, k)
for k := 0; k < 2; k++ {
k := ds.NewKey(fmt.Sprintf("%d/%d/%d", i, j, k))
keymap[k] = len(keys)
keys = append(keys, k)
}
}
}

for _, k := range keys {
op(k)
}

// Get works normally.
for _, k := range keys {
checkOp(d, k)
}

// Put not flushed
checkUndoOp(child, ds.NewKey("0"))

// Delete works.
deletedKey := ds.NewKey("2/1/1")
undoOp(deletedKey)
checkUndoOp(d, deletedKey)

// Put still not flushed
checkUndoOp(child, ds.NewKey("0"))

// Sync the tree "0/*/*"
if err := d.Sync(ds.NewKey("0")); err != nil {
t.Fatal(err)
}

// Try to get keys "0/*/*" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}}, checkOp)

// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)

// Sync the tree "1/1/*"
if err := d.Sync(ds.NewKey("1/1")); err != nil {
t.Fatal(err)
}

// Try to get keys "0/*/*" and "1/1/*" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}}, checkOp)

// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)

// Sync the tree "3/1/1"
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
t.Fatal(err)
}

// Try to get keys "0/*/*", "1/1/*", "3/1/1" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}, {"3/1/1", "3/1/1"}}, checkOp)

// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)

if err := d.Sync(ds.Key{}); err != nil {
t.Fatal(err)
}

// Never flushed the deleted key.
checkUndoOp(child, deletedKey)

// Try to get all keys except the deleted key from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "2/1/0"}, {"3", "3/1/1"}}, checkOp)

// Add the deleted key into the datastore
op(deletedKey)

// Sync it
if err := d.Sync(deletedKey); err != nil {
t.Fatal(err)
}

// Check it
checkOp(d, deletedKey)
}

func checkKeyRange(t *testing.T, keymap map[ds.Key]int, keys []ds.Key,
d ds.Datastore, validKeyRanges [][]string, checkFn func(ds.Datastore, ds.Key)) {
t.Helper()
for _, validKeyBoundaries := range validKeyRanges {
start, end := keymap[ds.NewKey(validKeyBoundaries[0])], keymap[ds.NewKey(validKeyBoundaries[1])]
for _, k := range keys[start:end] {
checkFn(d, k)
}
}
}
10 changes: 10 additions & 0 deletions basic_ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (d *MapDatastore) Put(key Key, value []byte) (err error) {
return nil
}

// Sync implements Datastore.Sync
func (d *MapDatastore) Sync(prefix Key) error {
return nil
}

// Get implements Datastore.Get
func (d *MapDatastore) Get(key Key) (value []byte, err error) {
val, found := d.values[key]
Expand Down Expand Up @@ -95,6 +100,11 @@ func (d *NullDatastore) Put(key Key, value []byte) (err error) {
return nil
}

// Sync implements Datastore.Sync
func (d *NullDatastore) Sync(prefix Key) error {
return nil
}

// Get implements Datastore.Get
func (d *NullDatastore) Get(key Key) (value []byte, err error) {
return nil, ErrNotFound
Expand Down
7 changes: 7 additions & 0 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ should be checked by callers.
type Datastore interface {
Read
Write
// Sync guarantees that any Put or Delete calls under prefix that returned
// before Sync(prefix) was called will be observed after Sync(prefix)
// returns, even if the program crashes. If Put/Delete operations already
// satisfy these requirements then Sync may be a no-op.
//
// If the prefix fails to Sync this method returns an error.
Sync(prefix Key) error
io.Closer
}

Expand Down
6 changes: 6 additions & 0 deletions delayed/delayed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (dds *Delayed) Put(key ds.Key, value []byte) (err error) {
return dds.ds.Put(key, value)
}

// Sync implements Datastore.Sync
func (dds *Delayed) Sync(prefix ds.Key) error {
dds.delay.Wait()
return dds.ds.Sync(prefix)
}

// Get implements the ds.Datastore interface.
func (dds *Delayed) Get(key ds.Key) (value []byte, err error) {
dds.delay.Wait()
Expand Down
13 changes: 10 additions & 3 deletions examples/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return ioutil.WriteFile(fn, value, 0666)
}

// Sync would ensure that any previous Puts under the prefix are written to disk.
// However, they already are.
func (d *Datastore) Sync(prefix ds.Key) error {
return nil
}

// Get returns the value for given key
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
fn := d.KeyFilename(key)
Expand Down Expand Up @@ -103,8 +109,9 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {

walkFn := func(path string, info os.FileInfo, err error) error {
// remove ds path prefix
if strings.HasPrefix(path, d.path) {
path = path[len(d.path):]
relPath, err := filepath.Rel(d.path, path)
if err == nil {
path = filepath.ToSlash(relPath)
}

if !info.IsDir() {
Expand Down Expand Up @@ -167,7 +174,7 @@ func (d *Datastore) DiskUsage() (uint64, error) {
log.Println(err)
return err
}
if f != nil {
if f != nil && f.Mode().IsRegular() {
du += uint64(f.Size())
}
return nil
Expand Down
7 changes: 5 additions & 2 deletions examples/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ func (ks *DSSuite) TestDiskUsage(c *C) {
"foo/bar/baz/barb",
})

totalBytes := 0
for _, k := range keys {
err := ks.ds.Put(k, []byte(k.String()))
value := []byte(k.String())
totalBytes += len(value)
err := ks.ds.Put(k, value)
c.Check(err, Equals, nil)
}

if ps, ok := ks.ds.(ds.PersistentDatastore); ok {
if s, err := ps.DiskUsage(); s <= 100 || err != nil {
if s, err := ps.DiskUsage(); s != uint64(totalBytes) || err != nil {
c.Error("unexpected size is: ", s)
}
} else {
Expand Down
10 changes: 10 additions & 0 deletions failstore/failstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ func (d *Failstore) Put(k ds.Key, val []byte) error {
return d.child.Put(k, val)
}

// Sync implements Datastore.Sync
func (d *Failstore) Sync(prefix ds.Key) error {
err := d.errfunc("sync")
if err != nil {
return err
}

return d.child.Sync(prefix)
}

// Get retrieves a value from the datastore.
func (d *Failstore) Get(k ds.Key) ([]byte, error) {
err := d.errfunc("get")
Expand Down
5 changes: 5 additions & 0 deletions keytransform/keytransform.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(d.ConvertKey(key), value)
}

// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
return d.child.Sync(d.ConvertKey(prefix))
}

// Get returns the value for given key, transforming the key first.
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return d.child.Get(d.ConvertKey(key))
Expand Down
18 changes: 18 additions & 0 deletions mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return cds.Put(k, value)
}

// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
// Sync all mount points below the prefix
// Sync the mount point right at (or above) the prefix
dstores, mountPts, rest := d.lookupAll(prefix)
for i, suffix := range rest {
if err := dstores[i].Sync(suffix); err != nil {
return err
}

if mountPts[i].Equal(prefix) || suffix.String() != "/" {
return nil
}
}

return nil
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
Expand Down
Loading

0 comments on commit 8b79466

Please sign in to comment.