From 8577f51a3d88347a5d4d0890d592fc72a80d2bf7 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 27 Oct 2020 00:28:11 -0400 Subject: [PATCH 1/2] Evict info files cache before bootstrapping index segments in peers bootstrapper. --- .../integration/peers_bootstrap_index_test.go | 77 ++++++++++++++++++- .../bootstrap/bootstrapper/peers/source.go | 3 + src/dbnode/storage/bootstrap/cache.go | 47 +++++++---- src/dbnode/storage/bootstrap/types.go | 3 + 4 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/dbnode/integration/peers_bootstrap_index_test.go b/src/dbnode/integration/peers_bootstrap_index_test.go index 6083bfc0f0..d5907d9dc1 100644 --- a/src/dbnode/integration/peers_bootstrap_index_test.go +++ b/src/dbnode/integration/peers_bootstrap_index_test.go @@ -26,13 +26,17 @@ import ( "testing" "time" + indexpb "github.com/m3db/m3/src/dbnode/generated/proto/index" "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/m3ninx/generated/proto/fswriter" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" ) @@ -53,7 +57,7 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) { idxOpts := namespace.NewIndexOptions(). SetEnabled(true). - SetBlockSize(2 * blockSize) + SetBlockSize(blockSize) nOpts := namespace.NewOptions(). SetRetentionOptions(rOpts). SetIndexOptions(idxOpts) @@ -92,7 +96,18 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) { Tags: ident.NewTags(ident.StringTag("city", "seattle")), } + quxSeries := generate.Series{ + ID: ident.StringID("qux"), + Tags: ident.NewTags(ident.StringTag("city", "new_orleans")), + } + seriesMaps := generate.BlocksByStart([]generate.BlockConfig{ + { + IDs: []string{quxSeries.ID.String()}, + Tags: quxSeries.Tags, + NumPoints: 100, + Start: now.Add(-2 * blockSize), + }, { IDs: []string{fooSeries.ID.String()}, Tags: fooSeries.Tags, @@ -159,7 +174,7 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) { verifyQueryMetadataResults(t, iter, fetchResponse.Exhaustive, verifyQueryMetadataResultsOptions{ namespace: ns1.ID(), exhaustive: true, - expected: []generate.Series{fooSeries, barSeries}, + expected: []generate.Series{fooSeries, barSeries, quxSeries}, }) // Match all *e*e* @@ -173,6 +188,62 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) { verifyQueryMetadataResults(t, iter, fetchResponse.Exhaustive, verifyQueryMetadataResultsOptions{ namespace: ns1.ID(), exhaustive: true, - expected: []generate.Series{barSeries, bazSeries}, + expected: []generate.Series{barSeries, bazSeries, quxSeries}, }) + + // Ensure that the index data for qux has been written to disk. + numDocsPerBlockStart, err := getNumDocsPerBlockStart( + ns1.ID(), + setups[1].FilesystemOpts(), + ) + require.NoError(t, err) + numDocs, ok := numDocsPerBlockStart[xtime.ToUnixNano(now.Add(-2*blockSize).Truncate(blockSize))] + require.True(t, ok) + require.Equal(t, numDocs, 1) +} + +type indexInfo struct { + Info indexpb.IndexVolumeInfo + VolumeIndex int +} + +func getNumDocsPerBlockStart( + nsID ident.ID, + fsOpts fs.Options, +) (map[xtime.UnixNano]int, error) { + numDocsPerBlockStart := make(map[xtime.UnixNano]int) + infoFiles := fs.ReadIndexInfoFiles( + fsOpts.FilePathPrefix(), + nsID, + fsOpts.InfoReaderBufferSize(), + ) + // Grab the latest index info file for each blockstart. + latestIndexInfoPerBlockStart := make(map[xtime.UnixNano]indexInfo) + for _, f := range infoFiles { + info, ok := latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] + if !ok { + latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = indexInfo{ + Info: f.Info, + VolumeIndex: f.ID.VolumeIndex, + } + continue + } + + if f.ID.VolumeIndex > info.VolumeIndex { + latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = indexInfo{ + Info: f.Info, + VolumeIndex: f.ID.VolumeIndex, + } + } + } + for blockStart, info := range latestIndexInfoPerBlockStart { + for _, segment := range info.Info.Segments { + metadata := fswriter.Metadata{} + if err := metadata.Unmarshal(segment.Metadata); err != nil { + return nil, err + } + numDocsPerBlockStart[blockStart] += int(metadata.NumDocs) + } + } + return numDocsPerBlockStart, nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index fd6fd7482c..e88cb59c9f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -185,6 +185,9 @@ func (s *peersSource) Read( continue } + // NB(bodu): We need to evict the info file cache before reading index data since we've + // maybe fetched blocks from peers so the cached info file state is now stale. + cache.Evict() r, err := s.readIndex(md, namespace.IndexRunOptions.ShardTimeRanges, span, diff --git a/src/dbnode/storage/bootstrap/cache.go b/src/dbnode/storage/bootstrap/cache.go index f5468e773c..4217d77064 100644 --- a/src/dbnode/storage/bootstrap/cache.go +++ b/src/dbnode/storage/bootstrap/cache.go @@ -37,12 +37,13 @@ var ( ) type cache struct { - sync.Once + sync.Mutex fsOpts fs.Options namespaceDetails []NamespaceDetails infoFilesByNamespace InfoFilesByNamespace iOpts instrument.Options + hasRead bool } // NewCache creates a cache specifically to be used during the bootstrap process. @@ -53,9 +54,10 @@ func NewCache(options CacheOptions) (Cache, error) { return nil, err } return &cache{ - fsOpts: options.FilesystemOptions(), - namespaceDetails: options.NamespaceDetails(), - iOpts: options.InstrumentOptions(), + fsOpts: options.FilesystemOptions(), + namespaceDetails: options.NamespaceDetails(), + infoFilesByNamespace: make(InfoFilesByNamespace, len(options.NamespaceDetails())), + iOpts: options.InstrumentOptions(), }, nil } @@ -83,22 +85,33 @@ func (c *cache) InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.Rea return infoFileResults, nil } +func (c *cache) Evict() { + c.Lock() + defer c.Unlock() + c.hasRead = false +} + func (c *cache) ReadInfoFiles() InfoFilesByNamespace { - c.Once.Do(func() { - c.infoFilesByNamespace = make(InfoFilesByNamespace, len(c.namespaceDetails)) - for _, finder := range c.namespaceDetails { - result := make(InfoFileResultsPerShard, len(finder.Shards)) - for _, shard := range finder.Shards { - result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(), - finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(), - persist.FileSetFlushType) - } - - c.infoFilesByNamespace[finder.Namespace] = result + c.Lock() + defer c.Unlock() + if !c.hasRead { + c.populateInfoFilesByNamespaceWithLock() + c.hasRead = true + } + return c.infoFilesByNamespace +} + +func (c *cache) populateInfoFilesByNamespaceWithLock() { + for _, finder := range c.namespaceDetails { + result := make(InfoFileResultsPerShard, len(finder.Shards)) + for _, shard := range finder.Shards { + result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(), + finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(), + persist.FileSetFlushType) } - }) - return c.infoFilesByNamespace + c.infoFilesByNamespace[finder.Namespace] = result + } } type cacheOptions struct { diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index a90620dde2..0a64face66 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -441,6 +441,9 @@ type Cache interface { // ReadInfoFiles returns info file results for each shard grouped by namespace. A cached copy // is returned if the info files have already been read. ReadInfoFiles() InfoFilesByNamespace + + // Evict cache contents by re-reading fresh data in. + Evict() } // CacheOptions represents the options for Cache. From cc69b27d2e70682e6fca1b234384f7273d9a5d68 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 27 Oct 2020 17:54:50 -0400 Subject: [PATCH 2/2] Update mocks and change bool name. --- src/dbnode/storage/bootstrap/bootstrap_mock.go | 12 ++++++++++++ src/dbnode/storage/bootstrap/cache.go | 8 ++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 7079a5726a..7ecbc95f65 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -717,6 +717,18 @@ func (mr *MockCacheMockRecorder) ReadInfoFiles() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadInfoFiles", reflect.TypeOf((*MockCache)(nil).ReadInfoFiles)) } +// Evict mocks base method +func (m *MockCache) Evict() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Evict") +} + +// Evict indicates an expected call of Evict +func (mr *MockCacheMockRecorder) Evict() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Evict", reflect.TypeOf((*MockCache)(nil).Evict)) +} + // MockCacheOptions is a mock of CacheOptions interface type MockCacheOptions struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/bootstrap/cache.go b/src/dbnode/storage/bootstrap/cache.go index 4217d77064..77c8e20633 100644 --- a/src/dbnode/storage/bootstrap/cache.go +++ b/src/dbnode/storage/bootstrap/cache.go @@ -43,7 +43,7 @@ type cache struct { namespaceDetails []NamespaceDetails infoFilesByNamespace InfoFilesByNamespace iOpts instrument.Options - hasRead bool + hasPopulatedInfo bool } // NewCache creates a cache specifically to be used during the bootstrap process. @@ -88,15 +88,15 @@ func (c *cache) InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.Rea func (c *cache) Evict() { c.Lock() defer c.Unlock() - c.hasRead = false + c.hasPopulatedInfo = false } func (c *cache) ReadInfoFiles() InfoFilesByNamespace { c.Lock() defer c.Unlock() - if !c.hasRead { + if !c.hasPopulatedInfo { c.populateInfoFilesByNamespaceWithLock() - c.hasRead = true + c.hasPopulatedInfo = true } return c.infoFilesByNamespace }