From ca35f302f6845d59f34857f4f4851e079a718720 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 19 May 2021 16:44:28 +0530 Subject: [PATCH 1/2] use return instead of break in select case --- .../shipper/compactor/deletion/delete_requests_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go index 61499522e870..49398242d840 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go @@ -58,7 +58,7 @@ func (d *DeleteRequestsManager) loop() { level.Error(util_log.Logger).Log("msg", "failed to update metrics", "err", err) } case <-d.done: - break + return } } } From 6150cf1e6bf54b969eaab7c13449d33f9d8cdac1 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 19 May 2021 16:45:38 +0530 Subject: [PATCH 2/2] skip compaction on delete requests table --- .../stores/shipper/compactor/compactor.go | 4 ++++ .../deletion/delete_requests_store.go | 20 +++++++++---------- .../deletion/delete_requests_table.go | 4 ++-- .../deletion/delete_requests_table_test.go | 6 +++--- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 5d9b987492ac..da9ea46d651a 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -226,6 +226,10 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { } for _, tableName := range tables { + if tableName == deletion.DeleteRequestsTableName { + // we do not want to compact or apply retention on delete requests table + continue + } if err := c.CompactTable(ctx, tableName); err != nil { status = statusFailure } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go index b4c226b0b681..8660256c1f1e 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go @@ -32,7 +32,7 @@ const ( deleteRequestDetails indexType = "2" tempFileSuffix = ".temp" - deleteRequestsTableName = "delete_requests" + DeleteRequestsTableName = "delete_requests" ) var ( @@ -98,11 +98,11 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri // Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status // We don't want to set anything in hash key here since we would want to find delete requests by just status writeBatch := ds.indexClient.NewWriteBatch() - writeBatch.Add(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived)) + writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived)) // Add another entry with additional details like creation time, time range of delete request and selectors in value rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime)) - writeBatch.Add(deleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), + writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue), []byte(strings.Join(selectors, separator))) err := ds.indexClient.BatchWrite(ctx, writeBatch) @@ -116,7 +116,7 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri // GetDeleteRequestsByStatus returns all delete requests for given status. func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) { return ds.queryDeleteRequests(ctx, chunk.IndexQuery{ - TableName: deleteRequestsTableName, + TableName: DeleteRequestsTableName, HashValue: string(deleteRequestID), ValueEqual: []byte(status), }) @@ -125,7 +125,7 @@ func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, st // GetAllDeleteRequestsForUser returns all delete requests for a user. func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { return ds.queryDeleteRequests(ctx, chunk.IndexQuery{ - TableName: deleteRequestsTableName, + TableName: DeleteRequestsTableName, HashValue: string(deleteRequestID), RangeValuePrefix: []byte(userID), }) @@ -136,7 +136,7 @@ func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, userID, request userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) writeBatch := ds.indexClient.NewWriteBatch() - writeBatch.Add(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus)) + writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus)) return ds.indexClient.BatchWrite(ctx, writeBatch) } @@ -146,7 +146,7 @@ func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, req userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) deleteRequests, err := ds.queryDeleteRequests(ctx, chunk.IndexQuery{ - TableName: deleteRequestsTableName, + TableName: DeleteRequestsTableName, HashValue: string(deleteRequestID), RangeValuePrefix: []byte(userIDAndRequestID), }) @@ -185,7 +185,7 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu for i, deleteRequest := range deleteRequests { deleteRequestQuery := []chunk.IndexQuery{ { - TableName: deleteRequestsTableName, + TableName: DeleteRequestsTableName, HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID), }, } @@ -224,11 +224,11 @@ func (ds *deleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID, userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) writeBatch := ds.indexClient.NewWriteBatch() - writeBatch.Delete(deleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID)) + writeBatch.Delete(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID)) // Add another entry with additional details like creation time, time range of delete request and selectors in value rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime)) - writeBatch.Delete(deleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), + writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue)) return ds.indexClient.BatchWrite(ctx, writeBatch) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go index 2866990b34e4..1f55026e89e0 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go @@ -30,10 +30,10 @@ type deleteRequestsTable struct { wg sync.WaitGroup } -const objectPathInStorage = deleteRequestsTableName + "/" + deleteRequestsTableName + ".gz" +const objectPathInStorage = DeleteRequestsTableName + "/" + DeleteRequestsTableName + ".gz" func newDeleteRequestsTable(workingDirectory string, objectClient chunk.ObjectClient) (chunk.IndexClient, error) { - dbPath := filepath.Join(workingDirectory, deleteRequestsTableName, deleteRequestsTableName) + dbPath := filepath.Join(workingDirectory, DeleteRequestsTableName, DeleteRequestsTableName) boltdbIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: filepath.Dir(dbPath)}) if err != nil { return nil, err diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go index 9e837e5e1afd..050c07f0e77b 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go @@ -41,7 +41,7 @@ func TestDeleteRequestsTable(t *testing.T) { // add some records to the db batch := testDeleteRequestsTable.NewWriteBatch() - testutil.AddRecordsToBatch(batch, deleteRequestsTableName, 0, 10) + testutil.AddRecordsToBatch(batch, DeleteRequestsTableName, 0, 10) require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch)) // see if right records were written @@ -56,7 +56,7 @@ func TestDeleteRequestsTable(t *testing.T) { checkRecordsInStorage(t, storageFilePath, 0, 10) // add more records to the db - testutil.AddRecordsToBatch(batch, deleteRequestsTableName, 10, 10) + testutil.AddRecordsToBatch(batch, DeleteRequestsTableName, 10, 10) require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch)) // stop the table which should upload the db to storage @@ -88,7 +88,7 @@ func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecor defer func() { require.NoError(t, os.RemoveAll(tempDir)) }() - tempFilePath := filepath.Join(tempDir, deleteRequestsTableName) + tempFilePath := filepath.Join(tempDir, DeleteRequestsTableName) require.NoError(t, err) testutil.DecompressFile(t, storageFilePath, tempFilePath)