Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Asynchronous Writing Datastores #140

Merged
merged 3 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ func (d *Datastore) Put(k ds.Key, val []byte) error {
return nil
}

// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
b, err := d.child.Batch()
if err != nil {
return err
}

for k, o := range d.buffer {
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
continue
}

var err error
if o.delete {
err = b.Delete(k)
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
}

delete(d.buffer, k)
}

return b.Commit()
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
Expand Down
146 changes: 146 additions & 0 deletions autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,149 @@ func TestFlushing(t *testing.T) {
t.Fatal("wrong value")
}
}

func TestSync(t *testing.T) {
child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)

put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
t.Fatal(err)
}
}
del := func(key ds.Key) {
if err := d.Delete(key); err != nil {
t.Fatal(err)
}
}

get := func(d ds.Datastore, key ds.Key) {
val, err := d.Get(key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(val, []byte(key.String())) {
t.Fatal("wrong value")
}
}
invalidGet := func(d ds.Datastore, key ds.Key) {
if _, err := d.Get(key); err != ds.ErrNotFound {
t.Fatal("should not have found value")
}
}

// Test if Syncing Puts works
internalSyncTest(t, d, child, put, del, get, invalidGet)

// Test if Syncing Deletes works
internalSyncTest(t, d, child, del, put, invalidGet, get)
}

// This function can be used to test Sync Puts and Deletes
// For clarity comments are written as if op = Put and undoOp = Delete
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {
var keys []ds.Key
keymap := make(map[ds.Key]int)
for i := 0; i < 4; i++ {
k := ds.NewKey(fmt.Sprintf("%d", i))
keymap[k] = len(keys)
keys = append(keys, k)
for j := 0; j < 2; j++ {
k := ds.NewKey(fmt.Sprintf("%d/%d", i, j))
keymap[k] = len(keys)
keys = append(keys, k)
for k := 0; k < 2; k++ {
k := ds.NewKey(fmt.Sprintf("%d/%d/%d", i, j, k))
keymap[k] = len(keys)
keys = append(keys, k)
}
}
}

for _, k := range keys {
op(k)
}

// Get works normally.
for _, k := range keys {
checkOp(d, k)
}

// Put not flushed
checkUndoOp(child, ds.NewKey("0"))

// Delete works.
deletedKey := ds.NewKey("2/1/1")
undoOp(deletedKey)
checkUndoOp(d, deletedKey)

// Put still not flushed
checkUndoOp(child, ds.NewKey("0"))

// Sync the tree "0/*/*"
if err := d.Sync(ds.NewKey("0")); err != nil {
t.Fatal(err)
}

// Try to get keys "0/*/*" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}}, checkOp)

// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)

// Sync the tree "1/1/*"
if err := d.Sync(ds.NewKey("1/1")); err != nil {
t.Fatal(err)
}

// Try to get keys "0/*/*" and "1/1/*" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}}, checkOp)

// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)

// Sync the tree "3/1/1"
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
t.Fatal(err)
}

// Try to get keys "0/*/*", "1/1/*", "3/1/1" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}, {"3/1/1", "3/1/1"}}, checkOp)

// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)

if err := d.Sync(ds.Key{}); err != nil {
t.Fatal(err)
}

// Never flushed the deleted key.
checkUndoOp(child, deletedKey)

// Try to get all keys except the deleted key from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "2/1/0"}, {"3", "3/1/1"}}, checkOp)

// Add the deleted key into the datastore
op(deletedKey)

// Sync it
if err := d.Sync(deletedKey); err != nil {
t.Fatal(err)
}

// Check it
checkOp(d, deletedKey)
}

func checkKeyRange(t *testing.T, keymap map[ds.Key]int, keys []ds.Key,
d ds.Datastore, validKeyRanges [][]string, checkFn func(ds.Datastore, ds.Key)) {
t.Helper()
for _, validKeyBoundaries := range validKeyRanges {
start, end := keymap[ds.NewKey(validKeyBoundaries[0])], keymap[ds.NewKey(validKeyBoundaries[1])]
for _, k := range keys[start:end] {
checkFn(d, k)
}
}
}
10 changes: 10 additions & 0 deletions basic_ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (d *MapDatastore) Put(key Key, value []byte) (err error) {
return nil
}

// Sync implements Datastore.Sync
func (d *MapDatastore) Sync(prefix Key) error {
return nil
}

// Get implements Datastore.Get
func (d *MapDatastore) Get(key Key) (value []byte, err error) {
val, found := d.values[key]
Expand Down Expand Up @@ -95,6 +100,11 @@ func (d *NullDatastore) Put(key Key, value []byte) (err error) {
return nil
}

// Sync implements Datastore.Sync
func (d *NullDatastore) Sync(prefix Key) error {
return nil
}

// Get implements Datastore.Get
func (d *NullDatastore) Get(key Key) (value []byte, err error) {
return nil, ErrNotFound
Expand Down
7 changes: 7 additions & 0 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ should be checked by callers.
type Datastore interface {
Read
Write
// Sync guarantees that any Put or Delete calls under prefix that returned
// before Sync(prefix) was called will be observed after Sync(prefix)
// returns, even if the program crashes. If Put/Delete operations already
// satisfy these requirements then Sync may be a no-op.
//
// If the prefix fails to Sync this method returns an error.
Sync(prefix Key) error
io.Closer
}

Expand Down
6 changes: 6 additions & 0 deletions delayed/delayed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (dds *Delayed) Put(key ds.Key, value []byte) (err error) {
return dds.ds.Put(key, value)
}

// Sync implements Datastore.Sync
func (dds *Delayed) Sync(prefix ds.Key) error {
dds.delay.Wait()
return dds.ds.Sync(prefix)
}

// Get implements the ds.Datastore interface.
func (dds *Delayed) Get(key ds.Key) (value []byte, err error) {
dds.delay.Wait()
Expand Down
13 changes: 10 additions & 3 deletions examples/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return ioutil.WriteFile(fn, value, 0666)
}

// Sync would ensure that any previous Puts under the prefix are written to disk.
// However, they already are.
func (d *Datastore) Sync(prefix ds.Key) error {
return nil
}

// Get returns the value for given key
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
fn := d.KeyFilename(key)
Expand Down Expand Up @@ -103,8 +109,9 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {

walkFn := func(path string, info os.FileInfo, err error) error {
// remove ds path prefix
if strings.HasPrefix(path, d.path) {
path = path[len(d.path):]
relPath, err := filepath.Rel(d.path, path)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
path = filepath.ToSlash(relPath)
}

if !info.IsDir() {
Expand Down Expand Up @@ -167,7 +174,7 @@ func (d *Datastore) DiskUsage() (uint64, error) {
log.Println(err)
return err
}
if f != nil {
if f != nil && f.Mode().IsRegular() {
du += uint64(f.Size())
}
return nil
Expand Down
7 changes: 5 additions & 2 deletions examples/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ func (ks *DSSuite) TestDiskUsage(c *C) {
"foo/bar/baz/barb",
})

totalBytes := 0
for _, k := range keys {
err := ks.ds.Put(k, []byte(k.String()))
value := []byte(k.String())
totalBytes += len(value)
err := ks.ds.Put(k, value)
c.Check(err, Equals, nil)
}

if ps, ok := ks.ds.(ds.PersistentDatastore); ok {
if s, err := ps.DiskUsage(); s <= 100 || err != nil {
if s, err := ps.DiskUsage(); s != uint64(totalBytes) || err != nil {
c.Error("unexpected size is: ", s)
}
} else {
Expand Down
10 changes: 10 additions & 0 deletions failstore/failstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ func (d *Failstore) Put(k ds.Key, val []byte) error {
return d.child.Put(k, val)
}

// Sync implements Datastore.Sync
func (d *Failstore) Sync(prefix ds.Key) error {
err := d.errfunc("sync")
if err != nil {
return err
}

return d.child.Sync(prefix)
}

// Get retrieves a value from the datastore.
func (d *Failstore) Get(k ds.Key) ([]byte, error) {
err := d.errfunc("get")
Expand Down
5 changes: 5 additions & 0 deletions keytransform/keytransform.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(d.ConvertKey(key), value)
}

// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
return d.child.Sync(d.ConvertKey(prefix))
}

// Get returns the value for given key, transforming the key first.
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return d.child.Get(d.ConvertKey(key))
Expand Down
18 changes: 18 additions & 0 deletions mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return cds.Put(k, value)
}

// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
// Sync all mount points below the prefix
// Sync the mount point right at (or above) the prefix
dstores, mountPts, rest := d.lookupAll(prefix)
for i, suffix := range rest {
if err := dstores[i].Sync(suffix); err != nil {
return err
}

if mountPts[i].Equal(prefix) || suffix.String() != "/" {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This looks like a bug in lookupAll. lookupAll should only return mounts that can contain descendents of key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this as-is and fix it later or move this logic into lookupAll now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what you think the bug is here? It looks like the documented behavior for lookupAll.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookupAll is supposed to return all datastores that might contain keys that are descendants of the given key. That means we should be calling Sync on all datastores returned by lookupAll datastores and we shouldn't have this extra check.

However, if we have ["/foo", "/foo/bar"], "/foo" should "cover" "/foo/bar" given that lookup will always return "/foo". However, when given the prefix "/foo", lookupAll will return both.

Note: this is only technically a bug. Nobody would ever mount "/foo" over "/foo/bar".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody would ever mount "/foo" over "/foo/bar"

I thought it was expected behavior that we might mount "/" and also "/foo", or is the root a special case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first example was bad: order matters. If we mount "/" over "/foo", "/" will win (I think? check me here). In go-ipfs, we mount "/blocks" over "/".

Copy link
Member

@Stebalien Stebalien Dec 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I say "nobody would ever mount "/foo" over "/foo/bar" because "/foo" is a prefix of "/foo/bar" (and we check mounts in-order).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. We reverse-sort datastores so "/foo/bar" always comes before "/foo".

However, the bug was valid. Fix in #141.

}

return nil
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
Expand Down
Loading