diff --git a/physical/dynamodb/dynamodb.go b/physical/dynamodb/dynamodb.go index 62cf0cfb0080..f2e011e30fe5 100644 --- a/physical/dynamodb/dynamodb.go +++ b/physical/dynamodb/dynamodb.go @@ -340,15 +340,33 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error { }, }} - // clean up now empty 'folders' + // Clean up empty "folders" by looping through all levels of the path to the item being deleted looking for + // children. Loop from deepest path to shallowest, and only consider items children if they are not going to be + // deleted by our batch delete request. If a path has no valid children, then it should be considered an empty + // "folder" and be deleted along with the original item in our batch job. Because we loop from deepest path to + // shallowest, once we find a path level that contains valid children we can stop the cleanup operation. prefixes := physical.Prefixes(key) sort.Sort(sort.Reverse(sort.StringSlice(prefixes))) - for _, prefix := range prefixes { - hasChildren, err := d.hasChildren(prefix) + for index, prefix := range prefixes { + // Because delete batches its requests, we need to pass keys we know are going to be deleted into + // hasChildren so it can exclude those when it determines if there WILL be any children left after + // the delete operations have completed. + var excluded []string + if index == 0 { + // This is the value we know for sure is being deleted + excluded = append(excluded, recordKeyForVaultKey(key)) + } else { + // The previous path doesn't count as a child, since if we're still looping, we've found no children + excluded = append(excluded, recordKeyForVaultKey(prefixes[index-1])) + } + + hasChildren, err := d.hasChildren(prefix, excluded) if err != nil { return err } + if !hasChildren { + // If there are no children other than ones we know are being deleted then cleanup empty "folder" pointers requests = append(requests, &dynamodb.WriteRequest{ DeleteRequest: &dynamodb.DeleteRequest{ Key: map[string]*dynamodb.AttributeValue{ @@ -357,6 +375,12 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error { }, }, }) + } else { + // This loop starts at the deepest path and works backwards looking for children + // once a deeper level of the path has been found to have children there is no + // more cleanup that needs to happen, otherwise we might remove folder pointers + // to that deeper path making it "undiscoverable" with the list operation + break } } @@ -406,9 +430,12 @@ func (d *DynamoDBBackend) List(ctx context.Context, prefix string) ([]string, er } // hasChildren returns true if there exist items below a certain path prefix. -// To do so, the method fetches such items from DynamoDB. If there are more -// than one item (which is the "directory" item), there are children. -func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) { +// To do so, the method fetches such items from DynamoDB. This method is primarily +// used by Delete. Because DynamoDB requests are batched this method is being called +// before any deletes take place. To account for that hasChildren accepts a slice of +// strings representing values we expect to find that should NOT be counted as children +// because they are going to be deleted. +func (d *DynamoDBBackend) hasChildren(prefix string, exclude []string) (bool, error) { prefix = strings.TrimSuffix(prefix, "/") prefix = escapeEmptyPath(prefix) @@ -424,9 +451,10 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) { }, }, // Avoid fetching too many items from DynamoDB for performance reasons. - // We need at least two because one is the directory item, all others - // are children. - Limit: aws.Int64(2), + // We want to know if there are any children we don't expect to see. + // Answering that question requires fetching a minimum of one more item + // than the number we expect. In most cases this value will be 2 + Limit: aws.Int64(int64(len(exclude) + 1)), } d.permitPool.Acquire() @@ -436,7 +464,23 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) { if err != nil { return false, err } - return len(out.Items) > 1, nil + var childrenExist bool + for _, item := range out.Items { + for _, excluded := range exclude { + // Check if we've found an item we didn't expect to. Look for "folder" pointer keys (trailing slash) + // and regular value keys (no trailing slash) + if *item["Key"].S != excluded && *item["Key"].S != fmt.Sprintf("%s/", excluded) { + childrenExist = true + break + } + } + if childrenExist { + // We only need to find ONE child we didn't expect to. + break + } + } + + return childrenExist, nil } // LockWith is used for mutual exclusion based on the given key. diff --git a/physical/testing.go b/physical/testing.go index c64b55ea4435..375d9053efe5 100644 --- a/physical/testing.go +++ b/physical/testing.go @@ -181,6 +181,54 @@ func ExerciseBackend(t testing.TB, b Backend) { if len(keys) != 0 { t.Errorf("should be empty at end: %v", keys) } + + // When the root path is empty, adding and removing deep nested values should not break listing + e = &Entry{Key: "foo/nested1/nested2/value1", Value: []byte("baz")} + err = b.Put(context.Background(), e) + if err != nil { + t.Fatalf("deep nest: %v", err) + } + + e = &Entry{Key: "foo/nested1/nested2/value2", Value: []byte("baz")} + err = b.Put(context.Background(), e) + if err != nil { + t.Fatalf("deep nest: %v", err) + } + + err = b.Delete(context.Background(), "foo/nested1/nested2/value2") + if err != nil { + t.Fatalf("failed to remove deep nest: %v", err) + } + + keys, err = b.List(context.Background(), "") + if err != nil { + t.Fatalf("listing of root failed after deletion: %v", err) + } + if len(keys) == 0 { + t.Errorf("root is returning empty after deleting a single nested value, expected nested1/: %v", keys) + keys, err = b.List(context.Background(), "foo/nested1") + if err != nil { + t.Fatalf("listing of expected nested path 'foo/nested1' failed: %v", err) + } + // prove that the root should not be empty and that foo/nested1 exists + if len(keys) != 0 { + t.Logf(" keys can still be listed from nested1/ so it's not empty, expected nested2/: %v", keys) + } + } + + // cleanup left over listing bug test value + err = b.Delete(context.Background(), "foo/nested1/nested2/value1") + if err != nil { + t.Fatalf("failed to remove deep nest: %v", err) + } + + keys, err = b.List(context.Background(), "") + if err != nil { + t.Fatalf("listing of root failed after delete of deep nest: %v", err) + } + if len(keys) != 0 { + t.Errorf("should be empty at end: %v", keys) + } } func ExerciseBackend_ListPrefix(t testing.TB, b Backend) {