Skip to content

Commit

Permalink
fix a race in purger managing in process delete requests (grafana#2817)
Browse files Browse the repository at this point in the history
* fix a race in purger managing in process delete requests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* update changelog

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* fix a problem in taking address of delete requests in loops

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* minor nit

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* nit suggested in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
sandeepsukhani and pracucci authored Jul 9, 2020
1 parent e4f9647 commit 8344fdc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 43 deletions.
108 changes: 69 additions & 39 deletions purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ type Purger struct {

// we would only allow processing of singe delete request at a time since delete requests touching same chunks could change the chunk IDs of partially deleted chunks
// and break the purge plan for other requests
inProcessRequests map[string]DeleteRequest
inProcessRequestIDsMtx sync.RWMutex
inProcessRequests *inProcessRequestsCollection

// We do not want to limit pulling new delete requests to a fixed interval which otherwise would limit number of delete requests we process per user.
// While loading delete requests if we find more requests from user pending to be processed, we just set their id in usersWithPendingRequests and
Expand Down Expand Up @@ -149,7 +148,7 @@ func NewPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, sto
pullNewRequestsChan: make(chan struct{}, 1),
executePlansChan: make(chan deleteRequestWithLogger, 50),
workerJobChan: make(chan workerJob, 50),
inProcessRequests: map[string]DeleteRequest{},
inProcessRequests: newInProcessRequestsCollection(),
usersWithPendingRequests: map[string]struct{}{},
pendingPlansCount: map[string]int{},
}
Expand Down Expand Up @@ -231,9 +230,7 @@ func (p *Purger) workerJobCleanup(job workerJob) {
delete(p.pendingPlansCount, job.deleteRequestID)
p.pendingPlansCountMtx.Unlock()

p.inProcessRequestIDsMtx.Lock()
delete(p.inProcessRequests, job.userID)
p.inProcessRequestIDsMtx.Unlock()
p.inProcessRequests.remove(job.userID)

// request loading of more delete request if
// - user has more pending requests and
Expand Down Expand Up @@ -367,12 +364,13 @@ func (p *Purger) loadInprocessDeleteRequests() error {
return err
}

for _, deleteRequest := range requestsWithBuildingPlanStatus {
for i := range requestsWithBuildingPlanStatus {
deleteRequest := requestsWithBuildingPlanStatus[i]
req := makeDeleteRequestWithLogger(deleteRequest, util.Logger)
p.inProcessRequests.set(deleteRequest.UserID, &deleteRequest)

level.Info(req.logger).Log("msg", "loaded in process delete requests with status building plan")

p.inProcessRequests[deleteRequest.UserID] = deleteRequest
err := p.buildDeletePlan(req)
if err != nil {
p.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc()
Expand All @@ -389,11 +387,12 @@ func (p *Purger) loadInprocessDeleteRequests() error {
return err
}

for _, deleteRequest := range requestsWithDeletingStatus {
for i := range requestsWithDeletingStatus {
deleteRequest := requestsWithDeletingStatus[i]
req := makeDeleteRequestWithLogger(deleteRequest, util.Logger)
level.Info(req.logger).Log("msg", "loaded in process delete requests with status deleting")

p.inProcessRequests[deleteRequest.UserID] = deleteRequest
p.inProcessRequests.set(deleteRequest.UserID, &deleteRequest)
p.executePlansChan <- req
}

Expand All @@ -408,22 +407,21 @@ func (p *Purger) pullDeleteRequestsToPlanDeletes() error {
return err
}

p.inProcessRequestIDsMtx.RLock()
pendingDeleteRequestsCount := len(p.inProcessRequests)
p.inProcessRequestIDsMtx.RUnlock()

pendingDeleteRequestsCount := p.inProcessRequests.len()
now := model.Now()
oldestPendingRequestCreatedAt := model.Time(0)

// requests which are still being processed are also considered pending
if pendingDeleteRequestsCount != 0 {
oldestInProcessRequest := p.getOldestInProcessRequest()
oldestInProcessRequest := p.inProcessRequests.getOldest()
if oldestInProcessRequest != nil {
oldestPendingRequestCreatedAt = oldestInProcessRequest.CreatedAt
}
}

for _, deleteRequest := range deleteRequests {
for i := range deleteRequests {
deleteRequest := deleteRequests[i]

// adding an extra minute here to avoid a race between cancellation of request and picking of the request for processing
if deleteRequest.CreatedAt.Add(p.cfg.DeleteRequestCancelPeriod).Add(time.Minute).After(model.Now()) {
continue
Expand All @@ -434,11 +432,7 @@ func (p *Purger) pullDeleteRequestsToPlanDeletes() error {
oldestPendingRequestCreatedAt = deleteRequest.CreatedAt
}

p.inProcessRequestIDsMtx.RLock()
inprocessDeleteRequest, ok := p.inProcessRequests[deleteRequest.UserID]
p.inProcessRequestIDsMtx.RUnlock()

if ok {
if inprocessDeleteRequest := p.inProcessRequests.get(deleteRequest.UserID); inprocessDeleteRequest != nil {
p.usersWithPendingRequestsMtx.Lock()
p.usersWithPendingRequests[deleteRequest.UserID] = struct{}{}
p.usersWithPendingRequestsMtx.Unlock()
Expand All @@ -454,10 +448,7 @@ func (p *Purger) pullDeleteRequestsToPlanDeletes() error {
return err
}

p.inProcessRequestIDsMtx.Lock()
p.inProcessRequests[deleteRequest.UserID] = deleteRequest
p.inProcessRequestIDsMtx.Unlock()

p.inProcessRequests.set(deleteRequest.UserID, &deleteRequest)
req := makeDeleteRequestWithLogger(deleteRequest, util.Logger)

level.Info(req.logger).Log("msg", "building plan for a new delete request")
Expand Down Expand Up @@ -599,20 +590,6 @@ func (p *Purger) removeDeletePlan(ctx context.Context, userID, requestID string,
return p.objectClient.DeleteObject(ctx, objectKey)
}

func (p *Purger) getOldestInProcessRequest() *DeleteRequest {
p.inProcessRequestIDsMtx.RLock()
defer p.inProcessRequestIDsMtx.RUnlock()

var oldestRequest *DeleteRequest
for _, request := range p.inProcessRequests {
if oldestRequest == nil || request.CreatedAt.Before(oldestRequest.CreatedAt) {
oldestRequest = &request
}
}

return oldestRequest
}

// returns interval per plan
func splitByDay(start, end model.Time) []model.Interval {
numOfDays := numPlans(start, end)
Expand Down Expand Up @@ -714,3 +691,56 @@ func makeDeleteRequestWithLogger(deleteRequest DeleteRequest, l log.Logger) dele
logger := log.With(l, "user_id", deleteRequest.UserID, "request_id", deleteRequest.RequestID)
return deleteRequestWithLogger{deleteRequest, logger}
}

// inProcessRequestsCollection stores DeleteRequests which are in process by each user.
// Currently we only allow processing of one delete request per user so it stores single DeleteRequest per user.
type inProcessRequestsCollection struct {
requests map[string]*DeleteRequest
mtx sync.RWMutex
}

func newInProcessRequestsCollection() *inProcessRequestsCollection {
return &inProcessRequestsCollection{requests: map[string]*DeleteRequest{}}
}

func (i *inProcessRequestsCollection) set(userID string, request *DeleteRequest) {
i.mtx.Lock()
defer i.mtx.Unlock()

i.requests[userID] = request
}

func (i *inProcessRequestsCollection) get(userID string) *DeleteRequest {
i.mtx.RLock()
defer i.mtx.RUnlock()

return i.requests[userID]
}

func (i *inProcessRequestsCollection) remove(userID string) {
i.mtx.Lock()
defer i.mtx.Unlock()

delete(i.requests, userID)
}

func (i *inProcessRequestsCollection) len() int {
i.mtx.RLock()
defer i.mtx.RUnlock()

return len(i.requests)
}

func (i *inProcessRequestsCollection) getOldest() *DeleteRequest {
i.mtx.RLock()
defer i.mtx.RUnlock()

var oldestRequest *DeleteRequest
for _, request := range i.requests {
if oldestRequest == nil || request.CreatedAt.Before(oldestRequest.CreatedAt) {
oldestRequest = request
}
}

return oldestRequest
}
5 changes: 1 addition & 4 deletions purger/purger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,7 @@ func TestPurger_Restarts(t *testing.T) {
defer newPurger.StopAsync()

test.Poll(t, time.Minute, 0, func() interface{} {
newPurger.inProcessRequestIDsMtx.RLock()
defer newPurger.inProcessRequestIDsMtx.RUnlock()

return len(newPurger.inProcessRequests)
return newPurger.inProcessRequests.len()
})

// check whether data got deleted from the store since delete request has been processed
Expand Down

0 comments on commit 8344fdc

Please sign in to comment.