Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boltdb shipper deletion fixes #3746

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
}

for _, tableName := range tables {
if tableName == deletion.DeleteRequestsTableName {
// we do not want to compact or apply retention on delete requests table
continue
}
if err := c.CompactTable(ctx, tableName); err != nil {
status = statusFailure
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (d *DeleteRequestsManager) loop() {
level.Error(util_log.Logger).Log("msg", "failed to update metrics", "err", err)
}
case <-d.done:
break
return
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
deleteRequestDetails indexType = "2"

tempFileSuffix = ".temp"
deleteRequestsTableName = "delete_requests"
DeleteRequestsTableName = "delete_requests"
)

var (
Expand Down Expand Up @@ -98,11 +98,11 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri
// Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status
// We don't want to set anything in hash key here since we would want to find delete requests by just status
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))

// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Add(deleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue), []byte(strings.Join(selectors, separator)))

err := ds.indexClient.BatchWrite(ctx, writeBatch)
Expand All @@ -116,7 +116,7 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri
// GetDeleteRequestsByStatus returns all delete requests for given status.
func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
ValueEqual: []byte(status),
})
Expand All @@ -125,7 +125,7 @@ func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, st
// GetAllDeleteRequestsForUser returns all delete requests for a user.
func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
RangeValuePrefix: []byte(userID),
})
Expand All @@ -136,7 +136,7 @@ func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, userID, request
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))

return ds.indexClient.BatchWrite(ctx, writeBatch)
}
Expand All @@ -146,7 +146,7 @@ func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, req
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

deleteRequests, err := ds.queryDeleteRequests(ctx, chunk.IndexQuery{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: string(deleteRequestID),
RangeValuePrefix: []byte(userIDAndRequestID),
})
Expand Down Expand Up @@ -185,7 +185,7 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu
for i, deleteRequest := range deleteRequests {
deleteRequestQuery := []chunk.IndexQuery{
{
TableName: deleteRequestsTableName,
TableName: DeleteRequestsTableName,
HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID),
},
}
Expand Down Expand Up @@ -224,11 +224,11 @@ func (ds *deleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID,
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Delete(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))
writeBatch.Delete(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))

// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Delete(deleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue))

return ds.indexClient.BatchWrite(ctx, writeBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type deleteRequestsTable struct {
wg sync.WaitGroup
}

const objectPathInStorage = deleteRequestsTableName + "/" + deleteRequestsTableName + ".gz"
const objectPathInStorage = DeleteRequestsTableName + "/" + DeleteRequestsTableName + ".gz"

func newDeleteRequestsTable(workingDirectory string, objectClient chunk.ObjectClient) (chunk.IndexClient, error) {
dbPath := filepath.Join(workingDirectory, deleteRequestsTableName, deleteRequestsTableName)
dbPath := filepath.Join(workingDirectory, DeleteRequestsTableName, DeleteRequestsTableName)
boltdbIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: filepath.Dir(dbPath)})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDeleteRequestsTable(t *testing.T) {

// add some records to the db
batch := testDeleteRequestsTable.NewWriteBatch()
testutil.AddRecordsToBatch(batch, deleteRequestsTableName, 0, 10)
testutil.AddRecordsToBatch(batch, DeleteRequestsTableName, 0, 10)
require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch))

// see if right records were written
Expand All @@ -56,7 +56,7 @@ func TestDeleteRequestsTable(t *testing.T) {
checkRecordsInStorage(t, storageFilePath, 0, 10)

// add more records to the db
testutil.AddRecordsToBatch(batch, deleteRequestsTableName, 10, 10)
testutil.AddRecordsToBatch(batch, DeleteRequestsTableName, 10, 10)
require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch))

// stop the table which should upload the db to storage
Expand Down Expand Up @@ -88,7 +88,7 @@ func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecor
defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()
tempFilePath := filepath.Join(tempDir, deleteRequestsTableName)
tempFilePath := filepath.Join(tempDir, DeleteRequestsTableName)
require.NoError(t, err)
testutil.DecompressFile(t, storageFilePath, tempFilePath)

Expand Down