From c6fe28d2a95a830c47aad8e712c62a45a9ae6eb1 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 19 Nov 2020 03:24:15 -0500 Subject: [PATCH] [dbnode] Properly rebuild index segments if they fail verification. (#2879) --- src/dbnode/persist/fs/index_read.go | 34 +-- src/dbnode/persist/fs/types.go | 3 + .../storage/bootstrap/bootstrap_mock.go | 15 ++ .../storage/bootstrap/bootstrapper/README.md | 1 + .../bootstrap/bootstrapper/fs/source.go | 224 ++++++++++++++---- .../bootstrapper/fs/source_index_test.go | 47 +++- src/dbnode/storage/bootstrap/cache.go | 58 ++++- src/dbnode/storage/bootstrap/cache_test.go | 88 +++++++ .../storage/bootstrap/result/result_index.go | 5 + src/dbnode/storage/bootstrap/types.go | 6 + src/dbnode/storage/cleanup.go | 19 +- src/dbnode/storage/index.go | 10 +- src/dbnode/storage/shard.go | 3 +- 13 files changed, 434 insertions(+), 79 deletions(-) diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index 773c7b5291..5b93a34225 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -22,6 +22,7 @@ package fs import ( "bytes" + "errors" "fmt" "io" "io/ioutil" @@ -41,6 +42,9 @@ const ( mmapPersistFsIndexName = "mmap.persist.fs.index" ) +// ErrIndexReaderValidationFailed is returned for corrupt index segemnts. +var ErrIndexReaderValidationFailed = errors.New("validation failed") + type indexReader struct { opts Options filePathPrefix string @@ -305,16 +309,16 @@ func (r *indexReader) Validate() error { func (r *indexReader) validateDigestsFileDigest() error { if r.readDigests.digestsFileDigest != r.expectedDigestOfDigest { - return fmt.Errorf("read digests file checksum bad: expected=%d, actual=%d", - r.expectedDigestOfDigest, r.readDigests.digestsFileDigest) + return fmt.Errorf("(%w) read digests file checksum bad: expected=%d, actual=%d", + ErrIndexReaderValidationFailed, r.expectedDigestOfDigest, r.readDigests.digestsFileDigest) } return nil } func (r *indexReader) validateInfoFileDigest() error { if r.readDigests.infoFileDigest != r.expectedDigest.InfoDigest { - return fmt.Errorf("read info file checksum bad: expected=%d, actual=%d", - r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest) + return fmt.Errorf("(%w) read info file checksum bad: expected=%d, actual=%d", + ErrIndexReaderValidationFailed, r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest) } return nil } @@ -322,35 +326,37 @@ func (r *indexReader) validateInfoFileDigest() error { func (r *indexReader) validateSegmentFileDigest(segmentIdx, fileIdx int) error { if segmentIdx >= len(r.readDigests.segments) { return fmt.Errorf( - "have not read correct number of segments to validate segment %d checksums: "+ + "(%w) have not read correct number of segments to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, segmentIdx+1, len(r.readDigests.segments)) + ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.readDigests.segments)) } if segmentIdx >= len(r.expectedDigest.SegmentDigests) { return fmt.Errorf( - "have not read digest files correctly to validate segment %d checksums: "+ + "(%w) have not read digest files correctly to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests)) + ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests)) } if fileIdx >= len(r.readDigests.segments[segmentIdx].files) { return fmt.Errorf( - "have not read correct number of segment files to validate segment %d checksums: "+ + "(%w) have not read correct number of segment files to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, fileIdx+1, len(r.readDigests.segments[segmentIdx].files)) + ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1, + len(r.readDigests.segments[segmentIdx].files)) } if fileIdx >= len(r.expectedDigest.SegmentDigests[segmentIdx].Files) { return fmt.Errorf( - "have not read correct number of segment files to validate segment %d checksums: "+ + "(%w) have not read correct number of segment files to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, fileIdx+1, len(r.expectedDigest.SegmentDigests[segmentIdx].Files)) + ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1, + len(r.expectedDigest.SegmentDigests[segmentIdx].Files)) } expected := r.expectedDigest.SegmentDigests[segmentIdx].Files[fileIdx].Digest actual := r.readDigests.segments[segmentIdx].files[fileIdx].digest if actual != expected { - return fmt.Errorf("read segment file %d for segment %d checksum bad: expected=%d, actual=%d", - segmentIdx, fileIdx, expected, actual) + return fmt.Errorf("(%w) read segment file %d for segment %d checksum bad: expected=%d, actual=%d", + ErrIndexReaderValidationFailed, segmentIdx, fileIdx, expected, actual) } return nil } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 4db0c0c477..ecce353743 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -700,3 +700,6 @@ type IndexClaimsManager interface { blockStart time.Time, ) (int, error) } + +// DeleteFilesFn deletes files passed in as arg. +type DeleteFilesFn func(files []string) error diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 7ecbc95f65..680d2c98a4 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -703,6 +703,21 @@ func (mr *MockCacheMockRecorder) InfoFilesForShard(ns, shard interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InfoFilesForShard", reflect.TypeOf((*MockCache)(nil).InfoFilesForShard), ns, shard) } +// IndexInfoFilesForNamespace mocks base method +func (m *MockCache) IndexInfoFilesForNamespace(ns namespace.Metadata) ([]fs.ReadIndexInfoFileResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexInfoFilesForNamespace", ns) + ret0, _ := ret[0].([]fs.ReadIndexInfoFileResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexInfoFilesForNamespace indicates an expected call of IndexInfoFilesForNamespace +func (mr *MockCacheMockRecorder) IndexInfoFilesForNamespace(ns interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexInfoFilesForNamespace", reflect.TypeOf((*MockCache)(nil).IndexInfoFilesForNamespace), ns) +} + // ReadInfoFiles mocks base method func (m *MockCache) ReadInfoFiles() InfoFilesByNamespace { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/README.md b/src/dbnode/storage/bootstrap/bootstrapper/README.md index b062aaac3d..947ea7f10a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/README.md +++ b/src/dbnode/storage/bootstrap/bootstrapper/README.md @@ -6,6 +6,7 @@ The collection of bootstrappers comprise the task executed when bootstrapping a - `fs`: The filesystem bootstrapper, used to bootstrap as much data as possible from the local filesystem. - `peers`: The peers bootstrapper, used to bootstrap any remaining data from peers. This is used for a full node join too. + - *NOTE*: For the node leave case, the peers bs will persist default volume type index filesets to disk with non-overlapping shard time ranges to avoid re-building the entire index segment w/ new shards. - `commitlog`: The commit log bootstrapper, currently only used in the case that peers bootstrapping fails. Once the current block is being snapshotted frequently to disk it might be faster and make more sense to not actively use the peers bootstrapper and just use a combination of the filesystem bootstrapper and the minimal time range required from the commit log bootstrapper. - *NOTE*: the commitlog bootstrapper is special cased in that it runs for the *entire* bootstrappable range per shard whereas other bootstrappers fill in the unfulfilled gaps as bootstrapping progresses. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index a9a790d15f..7d97ff487f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -21,10 +21,12 @@ package fs import ( + "errors" "fmt" "sync" "time" + indexpb "github.com/m3db/m3/src/dbnode/generated/proto/index" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" @@ -47,6 +49,7 @@ import ( "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -79,12 +82,14 @@ type fileSystemSource struct { idPool ident.Pool newReaderFn newDataFileSetReaderFn newReaderPoolOpts bootstrapper.NewReaderPoolOptions + deleteFilesFn fs.DeleteFilesFn metrics fileSystemSourceMetrics } type fileSystemSourceMetrics struct { - persistedIndexBlocksRead tally.Counter - persistedIndexBlocksWrite tally.Counter + persistedIndexBlocksRead tally.Counter + persistedIndexBlocksWrite tally.Counter + indexBlocksFailedValidation tally.Counter } func newFileSystemSource(opts Options) (bootstrap.Source, error) { @@ -98,15 +103,17 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { opts = opts.SetInstrumentOptions(iopts) s := &fileSystemSource{ - opts: opts, - fsopts: opts.FilesystemOptions(), - log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")), - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - idPool: opts.IdentifierPool(), - newReaderFn: fs.NewReader, + opts: opts, + fsopts: opts.FilesystemOptions(), + log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), + idPool: opts.IdentifierPool(), + newReaderFn: fs.NewReader, + deleteFilesFn: fs.DeleteFiles, metrics: fileSystemSourceMetrics{ - persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), - persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), + persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), + persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), + indexBlocksFailedValidation: scope.Counter("index-blocks-failed-validation"), }, } s.newReaderPoolOpts.Alloc = s.newReader @@ -328,6 +335,7 @@ func (s *fileSystemSource) bootstrapFromReaders( builder *result.IndexBuilder, persistManager *bootstrapper.SharedPersistManager, compactor *bootstrapper.SharedCompactor, + cache bootstrap.Cache, ) { resultOpts := s.opts.ResultOptions() @@ -338,7 +346,7 @@ func (s *fileSystemSource) bootstrapFromReaders( s.loadShardReadersDataIntoShardResult(run, ns, accumulator, runOpts, runResult, resultOpts, timeWindowReaders, readerPool, - builder, persistManager, compactor) + builder, persistManager, compactor, cache) } } @@ -392,6 +400,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( builder *result.IndexBuilder, persistManager *bootstrapper.SharedPersistManager, compactor *bootstrapper.SharedCompactor, + cache bootstrap.Cache, ) { var ( blockPool = ropts.DatabaseBlockOptions().DatabaseBlockPool() @@ -578,7 +587,36 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } if shouldFlush && satisifiedFlushRanges { + // NB(bodu): If we are persisting an index segment to disk, we need to delete any existing + // index filesets at this block start. The newly persisted index segments becomes the new source of truth. + var ( + filesToDelete = []string{} + persistSuccess bool + ) + defer func() { + if persistSuccess { + if err := s.deleteFilesFn(filesToDelete); err != nil { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("failed to delete non default index filesets", + zap.Error(err), + zap.Stringer("namespace", ns.ID()), + zap.Stringer("requestedRanges", requestedRanges)) + }) + } + } + }() + filesToDelete = s.appendIndexFilesetFilesToDelete(ns, blockStart, cache, filesToDelete, runResult, iopts) + + // Use debug level with full log fidelity. s.log.Debug("building file set index segment", buildIndexLogFields...) + // Use info log with more high level attributes. + s.log.Info("rebuilding file set index segment", + zap.Stringer("namespace", ns.ID()), + zap.Int("totalEntries", totalEntries), + zap.Time("blockStart", blockStart), + zap.Time("blockEnd", blockEnd)) + // NB(bodu): The index claims manager ensures that we properly advance the volume index + // past existing volume indices. indexBlock, err = bootstrapper.PersistBootstrapIndexSegment( ns, requestedRanges, @@ -590,7 +628,11 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( blockStart, blockEnd, ) - if err != nil { + if err == nil { + // Track success. + s.metrics.persistedIndexBlocksWrite.Inc(1) + persistSuccess = true + } else { instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { l.Error("persist fs index bootstrap failed", zap.Error(err), @@ -598,8 +640,6 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( zap.Stringer("requestedRanges", requestedRanges)) }) } - // Track success. - s.metrics.persistedIndexBlocksWrite.Inc(1) } else { s.log.Info("building in-memory index segment", buildIndexLogFields...) indexBlock, err = bootstrapper.BuildBootstrapIndexSegment( @@ -650,6 +690,74 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( remainingRanges, timesWithErrors) } +// appendIndexFilesetFilesToDelete appends all index filesets at a block start for deletion. +// Also removes index results from given block start since results are not complete and/or corrupted +// and require an index segment rebuild. +func (s *fileSystemSource) appendIndexFilesetFilesToDelete( + ns namespace.Metadata, + blockStart time.Time, + cache bootstrap.Cache, + filesToDelete []string, + runResult *runResult, + iopts instrument.Options, +) []string { + infoFiles, err := cache.IndexInfoFilesForNamespace(ns) + if err != nil { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("failed to get index info files from cache", + zap.Error(err), + zap.Time("blockStart", blockStart), + zap.Stringer("namespace", ns.ID())) + }) + } + for i := range infoFiles { + if err := infoFiles[i].Err.Error(); err != nil { + // We already log errors once when bootstrapping from persisted + // index blocks just continue here. + continue + } + + info := infoFiles[i].Info + indexBlockStart := xtime.UnixNano(info.BlockStart).ToTime() + if blockStart.Equal(indexBlockStart) { + filesToDelete = append(filesToDelete, infoFiles[i].AbsoluteFilePaths...) + } + } + + // Remove index results for the block we're deleting. + if err := removeIndexResults(ns, blockStart, runResult); err != nil { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("error removing partial/corrupted index results", + zap.Error(err), + zap.Time("blockStart", blockStart), + zap.Stringer("namespace", ns.ID())) + }) + } + + return filesToDelete +} + +func removeIndexResults( + ns namespace.Metadata, + blockStart time.Time, + runResult *runResult, +) error { + runResult.Lock() + defer runResult.Unlock() + + multiErr := xerrors.NewMultiError() + results, ok := runResult.index.IndexResults()[xtime.ToUnixNano(blockStart)] + if ok { + for volumeType, indexBlock := range results.Iter() { + for _, seg := range indexBlock.Segments() { + multiErr = multiErr.Add(seg.Segment().Close()) + } + results.DeleteBlock(volumeType) + } + } + return multiErr.FinalError() +} + func (s *fileSystemSource) readNextEntryAndRecordBlock( nsCtx namespace.Context, accumulator bootstrap.NamespaceDataAccumulator, @@ -747,21 +855,17 @@ func (s *fileSystemSource) read( ) (*runResult, error) { var ( seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() - res *runResult + res = newRunResult() ) if shardTimeRanges.IsEmpty() { return newRunResult(), nil } - setOrMergeResult := func(newResult *runResult) { + mergeResult := func(newResult *runResult) { if newResult == nil { return } - if res == nil { - res = newResult - } else { - res = res.mergedResult(newResult) - } + res = res.mergedResult(newResult) } if run == bootstrapDataRunType { @@ -785,7 +889,7 @@ func (s *fileSystemSource) read( // subtract the shard + time ranges from what we intend to bootstrap // for those we found. r, err := s.bootstrapFromIndexPersistedBlocks(md, - shardTimeRanges) + shardTimeRanges, cache) if err != nil { s.log.Warn("filesystem bootstrapped failed to read persisted index blocks") } else { @@ -793,7 +897,7 @@ func (s *fileSystemSource) read( shardTimeRanges = shardTimeRanges.Copy() shardTimeRanges.Subtract(r.fulfilled) // Set or merge result. - setOrMergeResult(r.result) + mergeResult(r.result) } logSpan("bootstrap_from_index_persisted_blocks_done") } @@ -832,8 +936,6 @@ func (s *fileSystemSource) read( Cache: cache, }) - bootstrapFromReadersRunResult := newRunResult() - var buildWg sync.WaitGroup for i := 0; i < indexSegmentConcurrency; i++ { alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() @@ -869,19 +971,17 @@ func (s *fileSystemSource) read( buildWg.Add(1) go func() { s.bootstrapFromReaders(run, md, - accumulator, runOpts, bootstrapFromReadersRunResult, + accumulator, runOpts, res, readerPool, readersCh, builder, &bootstrapper.SharedPersistManager{Mgr: persistManager}, - &bootstrapper.SharedCompactor{Compactor: compactor}) + &bootstrapper.SharedCompactor{Compactor: compactor}, + cache) buildWg.Done() }() } buildWg.Wait() - // Merge any existing results if necessary. - setOrMergeResult(bootstrapFromReadersRunResult) - return res, nil } @@ -928,15 +1028,21 @@ type bootstrapFromIndexPersistedBlocksResult struct { func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( ns namespace.Metadata, shardTimeRanges result.ShardTimeRanges, + cache bootstrap.Cache, ) (bootstrapFromIndexPersistedBlocksResult, error) { res := bootstrapFromIndexPersistedBlocksResult{ fulfilled: result.NewShardTimeRanges(), } indexBlockSize := ns.Options().IndexOptions().BlockSize() - infoFiles := fs.ReadIndexInfoFiles(s.fsopts.FilePathPrefix(), ns.ID(), - s.fsopts.InfoReaderBufferSize()) + infoFiles, err := cache.IndexInfoFilesForNamespace(ns) + if err != nil { + return bootstrapFromIndexPersistedBlocksResult{}, err + } + // Track corrupted block starts as we will attempt to later recover + // from corruption by building an index segment from TSDB data. + corruptedBlockStarts := make(map[xtime.UnixNano]struct{}) for _, infoFile := range infoFiles { if err := infoFile.Err.Error(); err != nil { s.log.Error("unable to read index info file", @@ -949,7 +1055,18 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( } info := infoFile.Info - indexBlockStart := xtime.UnixNano(info.BlockStart).ToTime() + indexBlockStartUnixNanos := xtime.UnixNano(info.BlockStart) + indexBlockStart := indexBlockStartUnixNanos.ToTime() + + if _, ok := corruptedBlockStarts[indexBlockStartUnixNanos]; ok { + s.log.Info("index block corrupted skipping index info file", + zap.Stringer("namespace", ns.ID()), + zap.Stringer("blockStart", indexBlockStart), + zap.String("filepath", infoFile.Err.Filepath()), + ) + continue + } + indexBlockRange := xtime.Range{ Start: indexBlockStart, End: indexBlockStart.Add(indexBlockSize), @@ -1005,6 +1122,19 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( zap.Time("blockStart", indexBlockStart), zap.Int("volumeIndex", infoFile.ID.VolumeIndex), ) + if errors.Is(err, fs.ErrIndexReaderValidationFailed) { + // Emit a metric for failed validations. + s.metrics.indexBlocksFailedValidation.Inc(1) + // Track corrupted blocks and remove any loaded results. + corruptedBlockStarts[indexBlockStartUnixNanos] = struct{}{} + if err := removeIndexResults(ns, indexBlockStart, res.result); err != nil { + s.log.Error("error removing partial/corrupted index results", + zap.Error(err), + zap.Stringer("namespace", ns.ID()), + zap.Time("blockStart", indexBlockStart), + ) + } + } continue } @@ -1012,26 +1142,30 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( s.metrics.persistedIndexBlocksRead.Inc(1) // Record result. - if res.result == nil { - res.result = newRunResult() - } segmentsFulfilled := willFulfill // NB(bodu): All segments read from disk are already persisted. persistedSegments := make([]result.Segment, 0, len(readResult.Segments)) for _, segment := range readResult.Segments { persistedSegments = append(persistedSegments, result.NewSegment(segment, true)) } - volumeType := idxpersist.DefaultIndexVolumeType - if info.IndexVolumeType != nil { - volumeType = idxpersist.IndexVolumeType(info.IndexVolumeType.Value) - } indexBlockByVolumeType := result.NewIndexBlockByVolumeType(indexBlockStart) + volumeType := volumeTypeFromInfo(&info) indexBlockByVolumeType.SetBlock(volumeType, result.NewIndexBlock(persistedSegments, segmentsFulfilled)) + + if res.result == nil { + res.result = newRunResult() + } // NB(r): Don't need to call MarkFulfilled on the IndexResults here // as we've already passed the ranges fulfilled to the block that // we place in the IndexResuts with the call to Add(...). res.result.index.Add(indexBlockByVolumeType, nil) - res.fulfilled.AddRanges(segmentsFulfilled) + + // NB(bodu): We only mark ranges as fulfilled for the default index volume type. + // It's possible to have other index volume types but the default type is required to + // fulfill bootstrappable ranges. + if volumeType == idxpersist.DefaultIndexVolumeType { + res.fulfilled.AddRanges(segmentsFulfilled) + } } return res, nil @@ -1074,3 +1208,11 @@ func (r *runResult) mergedResult(other *runResult) *runResult { index: result.MergedIndexBootstrapResult(r.index, other.index), } } + +func volumeTypeFromInfo(info *indexpb.IndexVolumeInfo) idxpersist.IndexVolumeType { + volumeType := idxpersist.DefaultIndexVolumeType + if info.IndexVolumeType != nil { + volumeType = idxpersist.IndexVolumeType(info.IndexVolumeType.Value) + } + return volumeType +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index 14f83aa76d..754bec8de9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/m3ninx/index/segment/mem" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" @@ -304,7 +305,7 @@ func validateGoodTaggedSeries( } } -func TestBootstrapIndex(t *testing.T) { +func TestBootstrapIndexAndUnfulfilledRanges(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -334,9 +335,46 @@ func TestBootstrapIndex(t *testing.T) { times.shardTimeRanges, opts.FilesystemOptions(), nsMD) defer tester.Finish() + // Write out non default type index volume type index block and ensure + // that it gets deleted and is not loaded into the index results to test + // the unfulfilled shard time ranges case (missing default index volume type + // and/or index segments failed validation). + var ( + notDefaultIndexVolumeType = idxpersist.IndexVolumeType("not-default") + shards = map[uint32]struct{}{testShard: struct{}{}} + filesToDelete []string + ) + idxWriter, err := fs.NewIndexWriter(src.fsopts) + require.NoError(t, err) + require.NoError(t, idxWriter.Open(fs.IndexWriterOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + FileSetContentType: persist.FileSetIndexContentType, + Namespace: nsMD.ID(), + BlockStart: times.start, + VolumeIndex: 1, + }, + BlockSize: nsMD.Options().IndexOptions().BlockSize(), + FileSetType: persist.FileSetFlushType, + Shards: shards, + IndexVolumeType: notDefaultIndexVolumeType, + })) + // Don't need to write any actual data. + require.NoError(t, idxWriter.Close()) + src.deleteFilesFn = func(files []string) error { + filesToDelete = append(filesToDelete, files...) + multiErr := xerrors.NewMultiError() + for _, f := range files { + multiErr = multiErr.Add(os.Remove(f)) + } + return multiErr.FinalError() + } + tester.TestReadWith(src) indexResults := tester.ResultForNamespace(nsMD.ID()).IndexResult.IndexResults() + // Ensure we are attempting to delete a single index fileset (in this case w/ no data). + require.Len(t, filesToDelete, 3) + // Check that single persisted segment got written out infoFiles := fs.ReadIndexInfoFiles(src.fsopts.FilePathPrefix(), testNs1ID, src.fsopts.InfoReaderBufferSize()) @@ -362,6 +400,11 @@ func TestBootstrapIndex(t *testing.T) { require.True(t, ok) require.True(t, segment.IsPersisted()) + // Check that the non default index volume type (missing default index volume type) + // was not added to the index results. + _, ok = blockByVolumeType.GetBlock(notDefaultIndexVolumeType) + require.False(t, ok) + // Check that the second segment is mutable and was not written out blockByVolumeType, ok = indexResults[xtime.ToUnixNano(times.start.Add(testIndexBlockSize))] require.True(t, ok) @@ -378,7 +421,7 @@ func TestBootstrapIndex(t *testing.T) { // Validate that wrote the block out (and no index blocks // were read as existing index blocks on disk) counters := scope.Snapshot().Counters() - require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) + require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) } diff --git a/src/dbnode/storage/bootstrap/cache.go b/src/dbnode/storage/bootstrap/cache.go index 50ebad8349..bd4c91d613 100644 --- a/src/dbnode/storage/bootstrap/cache.go +++ b/src/dbnode/storage/bootstrap/cache.go @@ -39,11 +39,13 @@ var ( type cache struct { sync.Mutex - fsOpts fs.Options - namespaceDetails []NamespaceDetails - infoFilesByNamespace InfoFilesByNamespace - iOpts instrument.Options - hasPopulatedInfo bool + fsOpts fs.Options + namespaceDetails []NamespaceDetails + infoFilesByNamespace InfoFilesByNamespace + indexInfoFilesByNamespace IndexInfoFilesByNamespace + iOpts instrument.Options + hasPopulatedInfo bool + hasPopulatedIndexInfo bool } // NewCache creates a cache specifically to be used during the bootstrap process. @@ -54,10 +56,11 @@ func NewCache(options CacheOptions) (Cache, error) { return nil, err } return &cache{ - fsOpts: options.FilesystemOptions(), - namespaceDetails: options.NamespaceDetails(), - infoFilesByNamespace: make(InfoFilesByNamespace, len(options.NamespaceDetails())), - iOpts: options.InstrumentOptions(), + fsOpts: options.FilesystemOptions(), + namespaceDetails: options.NamespaceDetails(), + infoFilesByNamespace: make(InfoFilesByNamespace, len(options.NamespaceDetails())), + indexInfoFilesByNamespace: make(IndexInfoFilesByNamespace, len(options.NamespaceDetails())), + iOpts: options.InstrumentOptions(), }, nil } @@ -85,10 +88,24 @@ func (c *cache) InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.Rea return infoFileResults, nil } +func (c *cache) IndexInfoFilesForNamespace(ns namespace.Metadata) ( + []fs.ReadIndexInfoFileResult, + error, +) { + infoFiles, ok := c.readIndexInfoFiles()[ns] + // This should never happen as Cache object is initialized with all namespaces to bootstrap. + if !ok { + return nil, fmt.Errorf("attempting to read index info files for namespace %v not "+ + "specified at bootstrap startup", ns.ID().String()) + } + return infoFiles, nil +} + func (c *cache) Evict() { c.Lock() defer c.Unlock() c.hasPopulatedInfo = false + c.hasPopulatedIndexInfo = false } func (c *cache) ReadInfoFiles() InfoFilesByNamespace { @@ -111,14 +128,33 @@ func (c *cache) populateInfoFilesByNamespaceWithLock() { } for _, shard := range finder.Shards { result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(), - finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(), - persist.FileSetFlushType) + finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), + c.fsOpts.DecodingOptions(), persist.FileSetFlushType) } c.infoFilesByNamespace[finder.Namespace] = result } } +func (c *cache) readIndexInfoFiles() IndexInfoFilesByNamespace { + c.Lock() + defer c.Unlock() + if !c.hasPopulatedIndexInfo { + c.populateIndexInfoFilesByNamespaceWithLock() + c.hasPopulatedIndexInfo = true + } + return c.indexInfoFilesByNamespace +} + +func (c *cache) populateIndexInfoFilesByNamespaceWithLock() { + for i := range c.namespaceDetails { + finder := c.namespaceDetails[i] + c.indexInfoFilesByNamespace[finder.Namespace] = fs.ReadIndexInfoFiles( + c.fsOpts.FilePathPrefix(), finder.Namespace.ID(), + c.fsOpts.InfoReaderBufferSize()) + } +} + type cacheOptions struct { fsOpts fs.Options namespaceDetails []NamespaceDetails diff --git a/src/dbnode/storage/bootstrap/cache_test.go b/src/dbnode/storage/bootstrap/cache_test.go index 9b7a29114c..88699bbb93 100644 --- a/src/dbnode/storage/bootstrap/cache_test.go +++ b/src/dbnode/storage/bootstrap/cache_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" + idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" @@ -133,6 +134,54 @@ func TestCacheReadInfoFilesInvariantViolation(t *testing.T) { require.Error(t, err) } +func TestCacheReadIndexInfoFiles(t *testing.T) { + dir := createTempDir(t) + defer os.RemoveAll(dir) + + md1 := testNamespaceMetadata(t, ident.StringID("ns1")) + md2 := testNamespaceMetadata(t, ident.StringID("ns2")) + + fsOpts := testFilesystemOptions.SetFilePathPrefix(dir) + + shards := map[uint32]struct{}{ + 0: struct{}{}, + 1: struct{}{}, + } + writeIndexFilesets(t, md1.ID(), shards, fsOpts) + writeIndexFilesets(t, md2.ID(), shards, fsOpts) + + opts := NewCacheOptions(). + SetFilesystemOptions(fsOpts). + SetInstrumentOptions(fsOpts.InstrumentOptions()). + SetNamespaceDetails([]NamespaceDetails{ + { + Namespace: md1, + Shards: []uint32{0, 1}, + }, + { + Namespace: md2, + Shards: []uint32{0, 1}, + }, + }) + cache, err := NewCache(opts) + require.NoError(t, err) + + infoFilesByNamespace := cache.ReadInfoFiles() + require.NotEmpty(t, infoFilesByNamespace) + + // Ensure we have two namespaces. + require.Equal(t, 2, len(infoFilesByNamespace)) + + // Ensure each shard has three info files (one for each fileset written). + infoFiles, err := cache.IndexInfoFilesForNamespace(md1) + require.NoError(t, err) + require.Equal(t, 3, len(infoFiles)) + + infoFiles, err = cache.IndexInfoFilesForNamespace(md2) + require.NoError(t, err) + require.Equal(t, 3, len(infoFiles)) +} + func testNamespaceMetadata(t *testing.T, nsID ident.ID) namespace.Metadata { rOpts := testRetentionOptions.SetBlockSize(testBlockSize) md, err := namespace.NewMetadata(nsID, testNamespaceOptions. @@ -154,6 +203,45 @@ type testSeries struct { data []byte } +func writeIndexFilesets( + t *testing.T, + namespace ident.ID, + shards map[uint32]struct{}, + fsOpts fs.Options, +) { + blockStart := testStart + blockSize := 10 * time.Hour + numBlocks := 3 + for i := 0; i < numBlocks; i++ { + writeIndexFiles(t, namespace, shards, blockStart.Add(time.Duration(i)*blockSize), + blockSize, fsOpts) + } +} + +func writeIndexFiles( + t *testing.T, + namespace ident.ID, + shards map[uint32]struct{}, + blockStart time.Time, + blockSize time.Duration, + fsOpts fs.Options, +) { + idxWriter, err := fs.NewIndexWriter(fsOpts) + require.NoError(t, err) + require.NoError(t, idxWriter.Open(fs.IndexWriterOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + FileSetContentType: persist.FileSetIndexContentType, + Namespace: namespace, + BlockStart: blockStart, + }, + BlockSize: blockSize, + FileSetType: persist.FileSetFlushType, + Shards: shards, + IndexVolumeType: idxpersist.DefaultIndexVolumeType, + })) + require.NoError(t, idxWriter.Close()) +} + func writeFilesets(t *testing.T, namespace ident.ID, shard uint32, fsOpts fs.Options) { inputs := []struct { start time.Time diff --git a/src/dbnode/storage/bootstrap/result/result_index.go b/src/dbnode/storage/bootstrap/result/result_index.go index 1a398139a1..0bfb7ad4ed 100644 --- a/src/dbnode/storage/bootstrap/result/result_index.go +++ b/src/dbnode/storage/bootstrap/result/result_index.go @@ -312,6 +312,11 @@ func (b IndexBlockByVolumeType) SetBlock(volumeType persist.IndexVolumeType, blo b.data[volumeType] = block } +// DeleteBlock deletes an IndexBlock for volumeType. +func (b IndexBlockByVolumeType) DeleteBlock(volumeType persist.IndexVolumeType) { + delete(b.data, volumeType) +} + // Iter returns the underlying iterable map data. func (b IndexBlockByVolumeType) Iter() map[persist.IndexVolumeType]IndexBlock { return b.data diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 0a64face66..afdcc53450 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -430,6 +430,9 @@ type InfoFileResultsPerShard map[uint32][]fs.ReadInfoFileResult // InfoFilesByNamespace maps a namespace to info files grouped by shard. type InfoFilesByNamespace map[namespace.Metadata]InfoFileResultsPerShard +// IndexInfoFilesByNamespace maps a namespace to index info files. +type IndexInfoFilesByNamespace map[namespace.Metadata][]fs.ReadIndexInfoFileResult + // Cache provides a snapshot of info files for use throughout all stages of the bootstrap. type Cache interface { // InfoFilesForNamespace returns the info files grouped by namespace. @@ -438,6 +441,9 @@ type Cache interface { // InfoFilesForShard returns the info files grouped by shard for the provided namespace. InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.ReadInfoFileResult, error) + // IndexInfoFilesForNamespace returns the index info files. + IndexInfoFilesForNamespace(ns namespace.Metadata) ([]fs.ReadIndexInfoFileResult, error) + // ReadInfoFiles returns info file results for each shard grouped by namespace. A cached copy // is returned if the info files have already been read. ReadInfoFiles() InfoFilesByNamespace diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 96dde48304..6ce83e1048 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -39,13 +39,21 @@ import ( "go.uber.org/zap" ) -type commitLogFilesFn func(commitlog.Options) (persist.CommitLogFiles, []commitlog.ErrorWithPath, error) -type snapshotMetadataFilesFn func(fs.Options) ([]fs.SnapshotMetadata, []fs.SnapshotMetadataErrorWithPaths, error) +type ( + commitLogFilesFn func(commitlog.Options) ( + persist.CommitLogFiles, + []commitlog.ErrorWithPath, + error, + ) + snapshotMetadataFilesFn func(fs.Options) ( + []fs.SnapshotMetadata, + []fs.SnapshotMetadataErrorWithPaths, + error, + ) +) type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error) -type deleteFilesFn func(files []string) error - type deleteInactiveDirectoriesFn func(parentDirPath string, activeDirNames []string) error // Narrow interface so as not to expose all the functionality of the commitlog @@ -68,7 +76,7 @@ type cleanupManager struct { snapshotMetadataFilesFn snapshotMetadataFilesFn snapshotFilesFn snapshotFilesFn - deleteFilesFn deleteFilesFn + deleteFilesFn fs.DeleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn warmFlushCleanupInProgress bool coldFlushCleanupInProgress bool @@ -213,6 +221,7 @@ func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) erro return multiErr.FinalError() } + func (m *cleanupManager) Report() { m.RLock() coldFlushCleanupInProgress := m.coldFlushCleanupInProgress diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index c811175b59..6fb04ba061 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -87,9 +87,7 @@ const ( defaultFlushDocsBatchSize = 8192 ) -var ( - allQuery = idx.NewAllQuery() -) +var allQuery = idx.NewAllQuery() // nolint: maligned type nsIndex struct { @@ -109,7 +107,7 @@ type nsIndex struct { namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager indexFilesetsBeforeFn indexFilesetsBeforeFn - deleteFilesFn deleteFilesFn + deleteFilesFn fs.DeleteFilesFn readIndexInfoFilesFn readIndexInfoFilesFn newBlockFn index.NewBlockFn @@ -1996,6 +1994,10 @@ func (i *nsIndex) CleanupExpiredFileSets(t time.Time) error { return i.deleteFilesFn(filesets) } +// CleanupDuplicateFileSets only considers an index fileset of the same index volume type +// that covers a superset of shard time ranges as a dupe. We can have index filesets +// of the default volume type that have non-overlapping shard time ranges in the node leave +// case where we accept new shards and a index fileset is persisted to disk w/ the new shards. func (i *nsIndex) CleanupDuplicateFileSets() error { fsOpts := i.opts.CommitLogOptions().FilesystemOptions() infoFiles := i.readIndexInfoFilesFn( diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 061df21615..16fb3f4ae9 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -171,7 +171,7 @@ type dbShard struct { newFSMergeWithMemFn newFSMergeWithMemFn filesetsFn filesetsFn filesetPathsBeforeFn filesetPathsBeforeFn - deleteFilesFn deleteFilesFn + deleteFilesFn fs.DeleteFilesFn snapshotFilesFn snapshotFilesFn sleepFn func(time.Duration) identifierPool ident.Pool @@ -1325,7 +1325,6 @@ func (s *dbShard) insertSeriesForIndexingAsyncBatched( entryRefCountIncremented: true, }, }) - // i.e. unable to enqueue into shard insert queue if err != nil { entry.OnIndexFinalize(indexBlockStart) // release any reference's we've held for indexing