Skip to content

Commit

Permalink
Only run background GC when sealed blocks updated
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Feb 8, 2021
1 parent 834efa6 commit 3699014
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
37 changes: 28 additions & 9 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,21 @@ func (m *mutableSegments) NotifySealedBlocks(
}

m.Lock()
updated := false
for _, blockStart := range sealed {
_, exists := m.sealedBlockStarts[blockStart]
if exists {
continue
}
m.sealedBlockStarts[blockStart] = struct{}{}
updated = true
}
if updated {
// Only trigger background compact GC if
// and only if updated the sealed block starts.
m.backgroundCompactGCPending = true
m.maybeBackgroundCompactWithLock()
}
m.backgroundCompactGCPending = true
m.maybeBackgroundCompactWithLock()
m.Unlock()

return nil
Expand Down Expand Up @@ -445,6 +455,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() {
}

var (
gcRequired = false
gcPlan = &compaction.Plan{}
gcAlreadyRunning = m.compact.compactingBackgroundGarbageCollect
sealedBlockStarts = make(map[xtime.UnixNano]struct{}, len(m.sealedBlockStarts))
Expand All @@ -455,6 +466,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() {
sealedBlockStarts[k] = v
}
if !gcAlreadyRunning && m.backgroundCompactGCPending {
gcRequired = true
m.backgroundCompactGCPending = false

for _, seg := range m.backgroundSegments {
Expand Down Expand Up @@ -503,7 +515,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() {
m.compact.compactingBackgroundStandard = true
go func() {
m.backgroundCompactWithPlan(plan, m.compact.backgroundCompactors,
sealedBlockStarts)
gcRequired, sealedBlockStarts)

m.Lock()
m.compact.compactingBackgroundStandard = false
Expand All @@ -523,7 +535,7 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() {
})
} else {
m.backgroundCompactWithPlan(gcPlan, compactors,
sealedBlockStarts)
gcRequired, sealedBlockStarts)
m.closeCompactors(compactors)
}

Expand Down Expand Up @@ -593,6 +605,7 @@ func (m *mutableSegments) closeCompactedSegmentsWithLock(segments []*readableSeg
func (m *mutableSegments) backgroundCompactWithPlan(
plan *compaction.Plan,
compactors chan *compaction.Compactor,
gcRequired bool,
sealedBlocks map[xtime.UnixNano]struct{},
) {
sw := m.metrics.backgroundCompactionPlanRunLatency.Start()
Expand Down Expand Up @@ -629,7 +642,7 @@ func (m *mutableSegments) backgroundCompactWithPlan(
compactors <- compactor
wg.Done()
}()
err := m.backgroundCompactWithTask(task, compactor,
err := m.backgroundCompactWithTask(task, compactor, gcRequired,
sealedBlocks, log, logger.With(zap.Int("task", i)))
if err != nil {
instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) {
Expand All @@ -656,6 +669,7 @@ func (m *mutableSegments) newReadThroughSegment(seg fst.Segment) segment.Segment
func (m *mutableSegments) backgroundCompactWithTask(
task compaction.Task,
compactor *compaction.Compactor,
gcRequired bool,
sealedBlocks map[xtime.UnixNano]struct{},
log bool,
logger *zap.Logger,
Expand All @@ -669,9 +683,10 @@ func (m *mutableSegments) backgroundCompactWithTask(
segments = append(segments, seg.Segment)
}

start := time.Now()
compacted, err := compactor.Compact(segments,
segment.DocumentsFilterFn(func(d doc.Document) bool {
var documentsFilter segment.DocumentsFilter
if gcRequired {
// Only actively filter out documents if GC is required.
documentsFilter = segment.DocumentsFilterFn(func(d doc.Document) bool {
// Filter out any documents that only were indexed for
// sealed blocks.
if d.Ref == nil {
Expand All @@ -691,7 +706,11 @@ func (m *mutableSegments) backgroundCompactWithTask(
// Keep the series if and only if there are remaining
// index block starts outside of the sealed blocks starts.
return result.IndexedBlockStartsRemaining > 0
}),
})
}

start := time.Now()
compacted, err := compactor.Compact(segments, documentsFilter,
m.metrics.activeBlockGarbageCollectSeries,
mmap.ReporterOptions{
Context: mmap.Context{
Expand Down
9 changes: 9 additions & 0 deletions src/m3ninx/index/segment/builder/multi_segments_terms_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,19 @@ func (i *termsIterFromSegments) Next() bool {
)
for iter.Next() {
curr := iter.Current()
factor := 2
// First do exponential skipping.
for len(skip) >= factor && curr > skip[factor-1] {
skip = skip[factor:]
negativeOffset += postings.ID(factor)
factor *= 2
}
// Then linear.
for len(skip) > 0 && curr > skip[0] {
skip = skip[1:]
negativeOffset++
}
// Then skip the individual if matches.
if len(skip) > 0 && curr == skip[0] {
skip = skip[1:]
negativeOffset++
Expand Down

0 comments on commit 3699014

Please sign in to comment.