diff --git a/warehouse/router/router.go b/warehouse/router/router.go index 7c02155477..b2e66d249d 100644 --- a/warehouse/router/router.go +++ b/warehouse/router/router.go @@ -74,6 +74,8 @@ type Router struct { inProgressMap map[workerIdentifierMapKey][]jobID inProgressMapLock sync.RWMutex + processingMu sync.Mutex + scheduledTimesCache map[string][]int scheduledTimesCacheLock sync.RWMutex @@ -371,18 +373,22 @@ loop: continue } + r.processingMu.Lock() inProgressNamespaces := r.getInProgressNamespaces() r.logger.Debugf(`Current inProgress namespace identifiers for %s: %v`, r.destType, inProgressNamespaces) uploadJobsToProcess, err := r.uploadsToProcess(ctx, availableWorkers, inProgressNamespaces) if err != nil && ctx.Err() == nil { r.logger.Errorn("Error getting uploads to process", logger.NewErrorField(err)) + r.processingMu.Unlock() return err } - for _, uploadJob := range uploadJobsToProcess { r.setDestInProgress(uploadJob.warehouse, uploadJob.upload.ID) + } + r.processingMu.Unlock() + for _, uploadJob := range uploadJobsToProcess { workerName := r.workerIdentifier(uploadJob.warehouse) r.workerChannelMapLock.RLock() @@ -596,6 +602,9 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse return defaultUploadPriority, nil } + r.processingMu.Lock() + defer r.processingMu.Unlock() + // If it is present do nothing else delete it if _, inProgress := r.isUploadJobInProgress(warehouse, latestInfo.ID); !inProgress { if err := r.uploadRepo.DeleteWaiting(ctx, latestInfo.ID); err != nil {