From 8344fdc5a8dcc176e1a66debf667c314f724091c Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 9 Jul 2020 20:58:25 +0530 Subject: [PATCH] fix a race in purger managing in process delete requests (#2817) * fix a race in purger managing in process delete requests Signed-off-by: Sandeep Sukhani * update changelog Signed-off-by: Sandeep Sukhani * fix a problem in taking address of delete requests in loops Signed-off-by: Sandeep Sukhani * minor nit Signed-off-by: Sandeep Sukhani * nit suggested in PR review Signed-off-by: Sandeep Sukhani Co-authored-by: Marco Pracucci --- purger/purger.go | 108 +++++++++++++++++++++++++++--------------- purger/purger_test.go | 5 +- 2 files changed, 70 insertions(+), 43 deletions(-) diff --git a/purger/purger.go b/purger/purger.go index 2e966a659a97..5d2df4a4f60d 100644 --- a/purger/purger.go +++ b/purger/purger.go @@ -120,8 +120,7 @@ type Purger struct { // we would only allow processing of singe delete request at a time since delete requests touching same chunks could change the chunk IDs of partially deleted chunks // and break the purge plan for other requests - inProcessRequests map[string]DeleteRequest - inProcessRequestIDsMtx sync.RWMutex + inProcessRequests *inProcessRequestsCollection // We do not want to limit pulling new delete requests to a fixed interval which otherwise would limit number of delete requests we process per user. // While loading delete requests if we find more requests from user pending to be processed, we just set their id in usersWithPendingRequests and @@ -149,7 +148,7 @@ func NewPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, sto pullNewRequestsChan: make(chan struct{}, 1), executePlansChan: make(chan deleteRequestWithLogger, 50), workerJobChan: make(chan workerJob, 50), - inProcessRequests: map[string]DeleteRequest{}, + inProcessRequests: newInProcessRequestsCollection(), usersWithPendingRequests: map[string]struct{}{}, pendingPlansCount: map[string]int{}, } @@ -231,9 +230,7 @@ func (p *Purger) workerJobCleanup(job workerJob) { delete(p.pendingPlansCount, job.deleteRequestID) p.pendingPlansCountMtx.Unlock() - p.inProcessRequestIDsMtx.Lock() - delete(p.inProcessRequests, job.userID) - p.inProcessRequestIDsMtx.Unlock() + p.inProcessRequests.remove(job.userID) // request loading of more delete request if // - user has more pending requests and @@ -367,12 +364,13 @@ func (p *Purger) loadInprocessDeleteRequests() error { return err } - for _, deleteRequest := range requestsWithBuildingPlanStatus { + for i := range requestsWithBuildingPlanStatus { + deleteRequest := requestsWithBuildingPlanStatus[i] req := makeDeleteRequestWithLogger(deleteRequest, util.Logger) + p.inProcessRequests.set(deleteRequest.UserID, &deleteRequest) level.Info(req.logger).Log("msg", "loaded in process delete requests with status building plan") - p.inProcessRequests[deleteRequest.UserID] = deleteRequest err := p.buildDeletePlan(req) if err != nil { p.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() @@ -389,11 +387,12 @@ func (p *Purger) loadInprocessDeleteRequests() error { return err } - for _, deleteRequest := range requestsWithDeletingStatus { + for i := range requestsWithDeletingStatus { + deleteRequest := requestsWithDeletingStatus[i] req := makeDeleteRequestWithLogger(deleteRequest, util.Logger) level.Info(req.logger).Log("msg", "loaded in process delete requests with status deleting") - p.inProcessRequests[deleteRequest.UserID] = deleteRequest + p.inProcessRequests.set(deleteRequest.UserID, &deleteRequest) p.executePlansChan <- req } @@ -408,22 +407,21 @@ func (p *Purger) pullDeleteRequestsToPlanDeletes() error { return err } - p.inProcessRequestIDsMtx.RLock() - pendingDeleteRequestsCount := len(p.inProcessRequests) - p.inProcessRequestIDsMtx.RUnlock() - + pendingDeleteRequestsCount := p.inProcessRequests.len() now := model.Now() oldestPendingRequestCreatedAt := model.Time(0) // requests which are still being processed are also considered pending if pendingDeleteRequestsCount != 0 { - oldestInProcessRequest := p.getOldestInProcessRequest() + oldestInProcessRequest := p.inProcessRequests.getOldest() if oldestInProcessRequest != nil { oldestPendingRequestCreatedAt = oldestInProcessRequest.CreatedAt } } - for _, deleteRequest := range deleteRequests { + for i := range deleteRequests { + deleteRequest := deleteRequests[i] + // adding an extra minute here to avoid a race between cancellation of request and picking of the request for processing if deleteRequest.CreatedAt.Add(p.cfg.DeleteRequestCancelPeriod).Add(time.Minute).After(model.Now()) { continue @@ -434,11 +432,7 @@ func (p *Purger) pullDeleteRequestsToPlanDeletes() error { oldestPendingRequestCreatedAt = deleteRequest.CreatedAt } - p.inProcessRequestIDsMtx.RLock() - inprocessDeleteRequest, ok := p.inProcessRequests[deleteRequest.UserID] - p.inProcessRequestIDsMtx.RUnlock() - - if ok { + if inprocessDeleteRequest := p.inProcessRequests.get(deleteRequest.UserID); inprocessDeleteRequest != nil { p.usersWithPendingRequestsMtx.Lock() p.usersWithPendingRequests[deleteRequest.UserID] = struct{}{} p.usersWithPendingRequestsMtx.Unlock() @@ -454,10 +448,7 @@ func (p *Purger) pullDeleteRequestsToPlanDeletes() error { return err } - p.inProcessRequestIDsMtx.Lock() - p.inProcessRequests[deleteRequest.UserID] = deleteRequest - p.inProcessRequestIDsMtx.Unlock() - + p.inProcessRequests.set(deleteRequest.UserID, &deleteRequest) req := makeDeleteRequestWithLogger(deleteRequest, util.Logger) level.Info(req.logger).Log("msg", "building plan for a new delete request") @@ -599,20 +590,6 @@ func (p *Purger) removeDeletePlan(ctx context.Context, userID, requestID string, return p.objectClient.DeleteObject(ctx, objectKey) } -func (p *Purger) getOldestInProcessRequest() *DeleteRequest { - p.inProcessRequestIDsMtx.RLock() - defer p.inProcessRequestIDsMtx.RUnlock() - - var oldestRequest *DeleteRequest - for _, request := range p.inProcessRequests { - if oldestRequest == nil || request.CreatedAt.Before(oldestRequest.CreatedAt) { - oldestRequest = &request - } - } - - return oldestRequest -} - // returns interval per plan func splitByDay(start, end model.Time) []model.Interval { numOfDays := numPlans(start, end) @@ -714,3 +691,56 @@ func makeDeleteRequestWithLogger(deleteRequest DeleteRequest, l log.Logger) dele logger := log.With(l, "user_id", deleteRequest.UserID, "request_id", deleteRequest.RequestID) return deleteRequestWithLogger{deleteRequest, logger} } + +// inProcessRequestsCollection stores DeleteRequests which are in process by each user. +// Currently we only allow processing of one delete request per user so it stores single DeleteRequest per user. +type inProcessRequestsCollection struct { + requests map[string]*DeleteRequest + mtx sync.RWMutex +} + +func newInProcessRequestsCollection() *inProcessRequestsCollection { + return &inProcessRequestsCollection{requests: map[string]*DeleteRequest{}} +} + +func (i *inProcessRequestsCollection) set(userID string, request *DeleteRequest) { + i.mtx.Lock() + defer i.mtx.Unlock() + + i.requests[userID] = request +} + +func (i *inProcessRequestsCollection) get(userID string) *DeleteRequest { + i.mtx.RLock() + defer i.mtx.RUnlock() + + return i.requests[userID] +} + +func (i *inProcessRequestsCollection) remove(userID string) { + i.mtx.Lock() + defer i.mtx.Unlock() + + delete(i.requests, userID) +} + +func (i *inProcessRequestsCollection) len() int { + i.mtx.RLock() + defer i.mtx.RUnlock() + + return len(i.requests) +} + +func (i *inProcessRequestsCollection) getOldest() *DeleteRequest { + i.mtx.RLock() + defer i.mtx.RUnlock() + + var oldestRequest *DeleteRequest + for _, request := range i.requests { + if oldestRequest == nil || request.CreatedAt.Before(oldestRequest.CreatedAt) { + oldestRequest = request + } + } + + return oldestRequest +} diff --git a/purger/purger_test.go b/purger/purger_test.go index 84039a1f2625..55cc10b3bb62 100644 --- a/purger/purger_test.go +++ b/purger/purger_test.go @@ -364,10 +364,7 @@ func TestPurger_Restarts(t *testing.T) { defer newPurger.StopAsync() test.Poll(t, time.Minute, 0, func() interface{} { - newPurger.inProcessRequestIDsMtx.RLock() - defer newPurger.inProcessRequestIDsMtx.RUnlock() - - return len(newPurger.inProcessRequests) + return newPurger.inProcessRequests.len() }) // check whether data got deleted from the store since delete request has been processed