Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB Nested Values Bug #4570

Merged
merged 2 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions physical/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,33 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
},
}}

// clean up now empty 'folders'
// Clean up empty "folders" by looping through all levels of the path to the item being deleted looking for
// children. Loop from deepest path to shallowest, and only consider items children if they are not going to be
// deleted by our batch delete request. If a path has no valid children, then it should be considered an empty
// "folder" and be deleted along with the original item in our batch job. Because we loop from deepest path to
// shallowest, once we find a path level that contains valid children we can stop the cleanup operation.
prefixes := physical.Prefixes(key)
sort.Sort(sort.Reverse(sort.StringSlice(prefixes)))
for _, prefix := range prefixes {
hasChildren, err := d.hasChildren(prefix)
for index, prefix := range prefixes {
// Because delete batches its requests, we need to pass keys we know are going to be deleted into
// hasChildren so it can exclude those when it determines if there WILL be any children left after
// the delete operations have completed.
var excluded []string
if index == 0 {
// This is the value we know for sure is being deleted
excluded = append(excluded, recordKeyForVaultKey(key))
} else {
// The previous path doesn't count as a child, since if we're still looping, we've found no children
excluded = append(excluded, recordKeyForVaultKey(prefixes[index-1]))
}

hasChildren, err := d.hasChildren(prefix, excluded)
if err != nil {
return err
}

if !hasChildren {
// If there are no children other than ones we know are being deleted then cleanup empty "folder" pointers
requests = append(requests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
Expand All @@ -357,6 +375,12 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
},
},
})
} else {
// This loop starts at the deepest path and works backwards looking for children
// once a deeper level of the path has been found to have children there is no
// more cleanup that needs to happen, otherwise we might remove folder pointers
// to that deeper path making it "undiscoverable" with the list operation
break
}
}

Expand Down Expand Up @@ -406,9 +430,12 @@ func (d *DynamoDBBackend) List(ctx context.Context, prefix string) ([]string, er
}

// hasChildren returns true if there exist items below a certain path prefix.
// To do so, the method fetches such items from DynamoDB. If there are more
// than one item (which is the "directory" item), there are children.
func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
// To do so, the method fetches such items from DynamoDB. This method is primarily
// used by Delete. Because DynamoDB requests are batched this method is being called
// before any deletes take place. To account for that hasChildren accepts a slice of
// strings representing values we expect to find that should NOT be counted as children
// because they are going to be deleted.
func (d *DynamoDBBackend) hasChildren(prefix string, exclude []string) (bool, error) {
prefix = strings.TrimSuffix(prefix, "/")
prefix = escapeEmptyPath(prefix)

Expand All @@ -424,9 +451,10 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
},
},
// Avoid fetching too many items from DynamoDB for performance reasons.
// We need at least two because one is the directory item, all others
// are children.
Limit: aws.Int64(2),
// We want to know if there are any children we don't expect to see.
// Answering that question requires fetching a minimum of one more item
// than the number we expect. In most cases this value will be 2
Limit: aws.Int64(int64(len(exclude) + 1)),
}

d.permitPool.Acquire()
Expand All @@ -436,7 +464,23 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
if err != nil {
return false, err
}
return len(out.Items) > 1, nil
var childrenExist bool
for _, item := range out.Items {
for _, excluded := range exclude {
// Check if we've found an item we didn't expect to. Look for "folder" pointer keys (trailing slash)
// and regular value keys (no trailing slash)
if *item["Key"].S != excluded && *item["Key"].S != fmt.Sprintf("%s/", excluded) {
childrenExist = true
break
}
}
if childrenExist {
// We only need to find ONE child we didn't expect to.
break
}
}

return childrenExist, nil
}

// LockWith is used for mutual exclusion based on the given key.
Expand Down
48 changes: 48 additions & 0 deletions physical/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,54 @@ func ExerciseBackend(t testing.TB, b Backend) {
if len(keys) != 0 {
t.Errorf("should be empty at end: %v", keys)
}

// When the root path is empty, adding and removing deep nested values should not break listing
e = &Entry{Key: "foo/nested1/nested2/value1", Value: []byte("baz")}
err = b.Put(context.Background(), e)
if err != nil {
t.Fatalf("deep nest: %v", err)
}

e = &Entry{Key: "foo/nested1/nested2/value2", Value: []byte("baz")}
err = b.Put(context.Background(), e)
if err != nil {
t.Fatalf("deep nest: %v", err)
}

err = b.Delete(context.Background(), "foo/nested1/nested2/value2")
if err != nil {
t.Fatalf("failed to remove deep nest: %v", err)
}

keys, err = b.List(context.Background(), "")
if err != nil {
t.Fatalf("listing of root failed after deletion: %v", err)
}
if len(keys) == 0 {
t.Errorf("root is returning empty after deleting a single nested value, expected nested1/: %v", keys)
keys, err = b.List(context.Background(), "foo/nested1")
if err != nil {
t.Fatalf("listing of expected nested path 'foo/nested1' failed: %v", err)
}
// prove that the root should not be empty and that foo/nested1 exists
if len(keys) != 0 {
t.Logf(" keys can still be listed from nested1/ so it's not empty, expected nested2/: %v", keys)
}
}

// cleanup left over listing bug test value
err = b.Delete(context.Background(), "foo/nested1/nested2/value1")
if err != nil {
t.Fatalf("failed to remove deep nest: %v", err)
}

keys, err = b.List(context.Background(), "")
if err != nil {
t.Fatalf("listing of root failed after delete of deep nest: %v", err)
}
if len(keys) != 0 {
t.Errorf("should be empty at end: %v", keys)
}
}

func ExerciseBackend_ListPrefix(t testing.TB, b Backend) {
Expand Down