Skip to content

Commit

Permalink
Boltdb shipper deletion fixes (#3746)
Browse files Browse the repository at this point in the history
* use return instead of break in select case

* skip compaction on delete requests table
  • Loading branch information
sandeepsukhani authored May 19, 2021
1 parent f6dcd3f commit 3b26c63
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 16 deletions.
4 changes: 4 additions & 0 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
}

for _, tableName := range tables {
if tableName == deletion.DeleteRequestsTableName {
// we do not want to compact or apply retention on delete requests table
continue
}
if err := c.CompactTable(ctx, tableName); err != nil {
status = statusFailure
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (d *DeleteRequestsManager) loop() {
level.Error(util_log.Logger).Log("msg", "failed to update metrics", "err", err)
}
case <-d.done:
break
return
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
deleteRequestDetails indexType = "2"

tempFileSuffix = ".temp"
deleteRequestsTableName = "delete_requests"
DeleteRequestsTableName = "delete_requests"
)

var (
Expand Down Expand Up @@ -98,11 +98,11 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri
// Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status
// We don't want to set anything in hash key here since we would want to find delete requests by just status
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))

// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Add(deleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue), []byte(strings.Join(selectors, separator)))

err := ds.indexClient.BatchWrite(ctx, writeBatch)
Expand All @@ -116,7 +116,7 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri
// GetDeleteRequestsByStatus returns all delete requests for given status.
func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
ValueEqual: []byte(status),
})
Expand All @@ -125,7 +125,7 @@ func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, st
// GetAllDeleteRequestsForUser returns all delete requests for a user.
func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
RangeValuePrefix: []byte(userID),
})
Expand All @@ -136,7 +136,7 @@ func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, userID, request
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))

return ds.indexClient.BatchWrite(ctx, writeBatch)
}
Expand All @@ -146,7 +146,7 @@ func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, req
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

deleteRequests, err := ds.queryDeleteRequests(ctx, chunk.IndexQuery{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
RangeValuePrefix: []byte(userIDAndRequestID),
})
Expand Down Expand Up @@ -185,7 +185,7 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu
for i, deleteRequest := range deleteRequests {
deleteRequestQuery := []chunk.IndexQuery{
{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID),
},
}
Expand Down Expand Up @@ -224,11 +224,11 @@ func (ds *deleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID,
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Delete(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))
writeBatch.Delete(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))

// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Delete(deleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue))

return ds.indexClient.BatchWrite(ctx, writeBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type deleteRequestsTable struct {
wg sync.WaitGroup
}

const objectPathInStorage = deleteRequestsTableName + "/" + deleteRequestsTableName + ".gz"
const objectPathInStorage = DeleteRequestsTableName + "/" + DeleteRequestsTableName + ".gz"

func newDeleteRequestsTable(workingDirectory string, objectClient chunk.ObjectClient) (chunk.IndexClient, error) {
dbPath := filepath.Join(workingDirectory, deleteRequestsTableName, deleteRequestsTableName)
dbPath := filepath.Join(workingDirectory, DeleteRequestsTableName, DeleteRequestsTableName)
boltdbIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: filepath.Dir(dbPath)})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDeleteRequestsTable(t *testing.T) {

// add some records to the db
batch := testDeleteRequestsTable.NewWriteBatch()
testutil.AddRecordsToBatch(batch, deleteRequestsTableName, 0, 10)
testutil.AddRecordsToBatch(batch, DeleteRequestsTableName, 0, 10)
require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch))

// see if right records were written
Expand All @@ -56,7 +56,7 @@ func TestDeleteRequestsTable(t *testing.T) {
checkRecordsInStorage(t, storageFilePath, 0, 10)

// add more records to the db
testutil.AddRecordsToBatch(batch, deleteRequestsTableName, 10, 10)
testutil.AddRecordsToBatch(batch, DeleteRequestsTableName, 10, 10)
require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch))

// stop the table which should upload the db to storage
Expand Down Expand Up @@ -88,7 +88,7 @@ func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecor
defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()
tempFilePath := filepath.Join(tempDir, deleteRequestsTableName)
tempFilePath := filepath.Join(tempDir, DeleteRequestsTableName)
require.NoError(t, err)
testutil.DecompressFile(t, storageFilePath, tempFilePath)

Expand Down

0 comments on commit 3b26c63

Please sign in to comment.