Skip to content

Commit

Permalink
Update DynamoDB physical backend Delete and hasChildren logic to prev…
Browse files Browse the repository at this point in the history
…ent overzealous cleanup of folders and values
  • Loading branch information
Matt Surabian committed May 16, 2018
1 parent 7ff3c1e commit 226f4f4
Showing 1 changed file with 54 additions and 10 deletions.
64 changes: 54 additions & 10 deletions physical/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,33 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
},
}}

// clean up now empty 'folders'
// Clean up empty "folders" by looping through all levels of the path to the item being deleted looking for
// children. Loop from deepest path to shallowest, and only consider items children if they are not going to be
// deleted by our batch delete request. If a path has no valid children, then it should be considered an empty
// "folder" and be deleted along with the original item in our batch job. Because we loop from deepest path to
// shallowest, once we find a path level that contains valid children we can stop the cleanup operation.
prefixes := physical.Prefixes(key)
sort.Sort(sort.Reverse(sort.StringSlice(prefixes)))
for _, prefix := range prefixes {
hasChildren, err := d.hasChildren(prefix)
for index, prefix := range prefixes {
// Because delete batches its requests, we need to pass keys we know are going to be deleted into
// hasChildren so it can exclude those when it determines if there WILL be any children left after
// the delete operations have completed.
var excluded []string
if index == 0 {
// This is the value we know for sure is being deleted
excluded = append(excluded, recordKeyForVaultKey(key))
} else {
// The previous path doesn't count as a child, since if we're still looping, we've found no children
excluded = append(excluded, recordKeyForVaultKey(prefixes[index-1]))
}

hasChildren, err := d.hasChildren(prefix, excluded)
if err != nil {
return err
}

if !hasChildren {
// If there are no children other than ones we know are being deleted then cleanup empty "folder" pointers
requests = append(requests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
Expand All @@ -357,6 +375,12 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
},
},
})
} else {
// This loop starts at the deepest path and works backwards looking for children
// once a deeper level of the path has been found to have children there is no
// more cleanup that needs to happen, otherwise we might remove folder pointers
// to that deeper path making it "undiscoverable" with the list operation
break
}
}

Expand Down Expand Up @@ -406,9 +430,12 @@ func (d *DynamoDBBackend) List(ctx context.Context, prefix string) ([]string, er
}

// hasChildren returns true if there exist items below a certain path prefix.
// To do so, the method fetches such items from DynamoDB. If there are more
// than one item (which is the "directory" item), there are children.
func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
// To do so, the method fetches such items from DynamoDB. This method is primarily
// used by Delete. Because DynamoDB requests are batched this method is being called
// before any deletes take place. To account for that hasChildren accepts a slice of
// strings representing values we expect to find that should NOT be counted as children
// because they are going to be deleted.
func (d *DynamoDBBackend) hasChildren(prefix string, exclude []string) (bool, error) {
prefix = strings.TrimSuffix(prefix, "/")
prefix = escapeEmptyPath(prefix)

Expand All @@ -424,9 +451,10 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
},
},
// Avoid fetching too many items from DynamoDB for performance reasons.
// We need at least two because one is the directory item, all others
// are children.
Limit: aws.Int64(2),
// We want to know if there are any children we don't expect to see.
// Answering that question requires fetching a minimum of one more item
// than the number we expect. In most cases this value will be 2
Limit: aws.Int64(int64(len(exclude) + 1)),
}

d.permitPool.Acquire()
Expand All @@ -436,7 +464,23 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
if err != nil {
return false, err
}
return len(out.Items) > 1, nil
var childrenExist bool
for _, item := range out.Items {
for _, excluded := range exclude {
// Check if we've found an item we didn't expect to. Look for "folder" pointer keys (trailing slash)
// and regular value keys (no trailing slash)
if *item["Key"].S != excluded && *item["Key"].S != fmt.Sprintf("%s/", excluded) {
childrenExist = true
break
}
}
if childrenExist {
// We only need to find ONE child we didn't expect to.
break
}
}

return childrenExist, nil
}

// LockWith is used for mutual exclusion based on the given key.
Expand Down

0 comments on commit 226f4f4

Please sign in to comment.