diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index bb88142895..681923b818 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -188,11 +188,21 @@ func (m *mutableSegments) NotifySealedBlocks( } m.Lock() + updated := false for _, blockStart := range sealed { + _, exists := m.sealedBlockStarts[blockStart] + if exists { + continue + } m.sealedBlockStarts[blockStart] = struct{}{} + updated = true + } + if updated { + // Only trigger background compact GC if + // and only if updated the sealed block starts. + m.backgroundCompactGCPending = true + m.maybeBackgroundCompactWithLock() } - m.backgroundCompactGCPending = true - m.maybeBackgroundCompactWithLock() m.Unlock() return nil @@ -445,6 +455,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() { } var ( + gcRequired = false gcPlan = &compaction.Plan{} gcAlreadyRunning = m.compact.compactingBackgroundGarbageCollect sealedBlockStarts = make(map[xtime.UnixNano]struct{}, len(m.sealedBlockStarts)) @@ -455,6 +466,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() { sealedBlockStarts[k] = v } if !gcAlreadyRunning && m.backgroundCompactGCPending { + gcRequired = true m.backgroundCompactGCPending = false for _, seg := range m.backgroundSegments { @@ -503,7 +515,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() { m.compact.compactingBackgroundStandard = true go func() { m.backgroundCompactWithPlan(plan, m.compact.backgroundCompactors, - sealedBlockStarts) + gcRequired, sealedBlockStarts) m.Lock() m.compact.compactingBackgroundStandard = false @@ -523,7 +535,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() { }) } else { m.backgroundCompactWithPlan(gcPlan, compactors, - sealedBlockStarts) + gcRequired, sealedBlockStarts) m.closeCompactors(compactors) } @@ -593,6 +605,7 @@ func (m *mutableSegments) closeCompactedSegmentsWithLock(segments []*readableSeg func (m *mutableSegments) backgroundCompactWithPlan( plan *compaction.Plan, compactors chan *compaction.Compactor, + gcRequired bool, sealedBlocks map[xtime.UnixNano]struct{}, ) { sw := m.metrics.backgroundCompactionPlanRunLatency.Start() @@ -629,7 +642,7 @@ func (m *mutableSegments) backgroundCompactWithPlan( compactors <- compactor wg.Done() }() - err := m.backgroundCompactWithTask(task, compactor, + err := m.backgroundCompactWithTask(task, compactor, gcRequired, sealedBlocks, log, logger.With(zap.Int("task", i))) if err != nil { instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { @@ -656,6 +669,7 @@ func (m *mutableSegments) newReadThroughSegment(seg fst.Segment) segment.Segment func (m *mutableSegments) backgroundCompactWithTask( task compaction.Task, compactor *compaction.Compactor, + gcRequired bool, sealedBlocks map[xtime.UnixNano]struct{}, log bool, logger *zap.Logger, @@ -669,9 +683,10 @@ func (m *mutableSegments) backgroundCompactWithTask( segments = append(segments, seg.Segment) } - start := time.Now() - compacted, err := compactor.Compact(segments, - segment.DocumentsFilterFn(func(d doc.Document) bool { + var documentsFilter segment.DocumentsFilter + if gcRequired { + // Only actively filter out documents if GC is required. + documentsFilter = segment.DocumentsFilterFn(func(d doc.Document) bool { // Filter out any documents that only were indexed for // sealed blocks. if d.Ref == nil { @@ -691,7 +706,11 @@ func (m *mutableSegments) backgroundCompactWithTask( // Keep the series if and only if there are remaining // index block starts outside of the sealed blocks starts. return result.IndexedBlockStartsRemaining > 0 - }), + }) + } + + start := time.Now() + compacted, err := compactor.Compact(segments, documentsFilter, m.metrics.activeBlockGarbageCollectSeries, mmap.ReporterOptions{ Context: mmap.Context{ diff --git a/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go b/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go index 5c40542ab7..f05a2fb203 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go +++ b/src/m3ninx/index/segment/builder/multi_segments_terms_iter.go @@ -161,10 +161,19 @@ func (i *termsIterFromSegments) Next() bool { ) for iter.Next() { curr := iter.Current() + factor := 2 + // First do exponential skipping. + for len(skip) >= factor && curr > skip[factor-1] { + skip = skip[factor:] + negativeOffset += postings.ID(factor) + factor *= 2 + } + // Then linear. for len(skip) > 0 && curr > skip[0] { skip = skip[1:] negativeOffset++ } + // Then skip the individual if matches. if len(skip) > 0 && curr == skip[0] { skip = skip[1:] negativeOffset++