Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Evict info files cache before index bootstrap in peers bootstrapper #2802

Merged
merged 3 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions src/dbnode/integration/peers_bootstrap_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ import (
"testing"
"time"

indexpb "github.com/m3db/m3/src/dbnode/generated/proto/index"
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/generated/proto/fswriter"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)
Expand All @@ -53,7 +57,7 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {

idxOpts := namespace.NewIndexOptions().
SetEnabled(true).
SetBlockSize(2 * blockSize)
SetBlockSize(blockSize)
nOpts := namespace.NewOptions().
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts)
Expand Down Expand Up @@ -92,7 +96,18 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {
Tags: ident.NewTags(ident.StringTag("city", "seattle")),
}

quxSeries := generate.Series{
ID: ident.StringID("qux"),
Tags: ident.NewTags(ident.StringTag("city", "new_orleans")),
}

seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{
IDs: []string{quxSeries.ID.String()},
Tags: quxSeries.Tags,
NumPoints: 100,
Start: now.Add(-2 * blockSize),
},
{
IDs: []string{fooSeries.ID.String()},
Tags: fooSeries.Tags,
Expand Down Expand Up @@ -159,7 +174,7 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {
verifyQueryMetadataResults(t, iter, fetchResponse.Exhaustive, verifyQueryMetadataResultsOptions{
namespace: ns1.ID(),
exhaustive: true,
expected: []generate.Series{fooSeries, barSeries},
expected: []generate.Series{fooSeries, barSeries, quxSeries},
})

// Match all *e*e*
Expand All @@ -173,6 +188,62 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {
verifyQueryMetadataResults(t, iter, fetchResponse.Exhaustive, verifyQueryMetadataResultsOptions{
namespace: ns1.ID(),
exhaustive: true,
expected: []generate.Series{barSeries, bazSeries},
expected: []generate.Series{barSeries, bazSeries, quxSeries},
})

// Ensure that the index data for qux has been written to disk.
numDocsPerBlockStart, err := getNumDocsPerBlockStart(
ns1.ID(),
setups[1].FilesystemOpts(),
)
require.NoError(t, err)
numDocs, ok := numDocsPerBlockStart[xtime.ToUnixNano(now.Add(-2*blockSize).Truncate(blockSize))]
require.True(t, ok)
require.Equal(t, numDocs, 1)
}

type indexInfo struct {
Info indexpb.IndexVolumeInfo
VolumeIndex int
}

func getNumDocsPerBlockStart(
nsID ident.ID,
fsOpts fs.Options,
) (map[xtime.UnixNano]int, error) {
numDocsPerBlockStart := make(map[xtime.UnixNano]int)
infoFiles := fs.ReadIndexInfoFiles(
fsOpts.FilePathPrefix(),
nsID,
fsOpts.InfoReaderBufferSize(),
)
// Grab the latest index info file for each blockstart.
latestIndexInfoPerBlockStart := make(map[xtime.UnixNano]indexInfo)
for _, f := range infoFiles {
info, ok := latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)]
if !ok {
latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = indexInfo{
Info: f.Info,
VolumeIndex: f.ID.VolumeIndex,
}
continue
}

if f.ID.VolumeIndex > info.VolumeIndex {
latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = indexInfo{
Info: f.Info,
VolumeIndex: f.ID.VolumeIndex,
}
}
}
for blockStart, info := range latestIndexInfoPerBlockStart {
for _, segment := range info.Info.Segments {
metadata := fswriter.Metadata{}
if err := metadata.Unmarshal(segment.Metadata); err != nil {
return nil, err
}
numDocsPerBlockStart[blockStart] += int(metadata.NumDocs)
}
}
return numDocsPerBlockStart, nil
}
12 changes: 12 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (s *peersSource) Read(
continue
}

// NB(bodu): We need to evict the info file cache before reading index data since we've
// maybe fetched blocks from peers so the cached info file state is now stale.
cache.Evict()
Copy link
Collaborator

@robskillington robskillington Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we just want to evict before iterating all namespaces so we can just evict once before processing all namespaces rather than evicting each time before processing an individual namespace?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, missed that this was in a loop over the namespaces. We definitely only want to to do this once.

r, err := s.readIndex(md,
namespace.IndexRunOptions.ShardTimeRanges,
span,
Expand Down
47 changes: 30 additions & 17 deletions src/dbnode/storage/bootstrap/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ var (
)

type cache struct {
sync.Once
sync.Mutex

fsOpts fs.Options
namespaceDetails []NamespaceDetails
infoFilesByNamespace InfoFilesByNamespace
iOpts instrument.Options
hasPopulatedInfo bool
}

// NewCache creates a cache specifically to be used during the bootstrap process.
Expand All @@ -53,9 +54,10 @@ func NewCache(options CacheOptions) (Cache, error) {
return nil, err
}
return &cache{
fsOpts: options.FilesystemOptions(),
namespaceDetails: options.NamespaceDetails(),
iOpts: options.InstrumentOptions(),
fsOpts: options.FilesystemOptions(),
namespaceDetails: options.NamespaceDetails(),
infoFilesByNamespace: make(InfoFilesByNamespace, len(options.NamespaceDetails())),
iOpts: options.InstrumentOptions(),
}, nil
}

Expand Down Expand Up @@ -83,22 +85,33 @@ func (c *cache) InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.Rea
return infoFileResults, nil
}

func (c *cache) Evict() {
c.Lock()
defer c.Unlock()
c.hasPopulatedInfo = false
}

func (c *cache) ReadInfoFiles() InfoFilesByNamespace {
c.Once.Do(func() {
c.infoFilesByNamespace = make(InfoFilesByNamespace, len(c.namespaceDetails))
for _, finder := range c.namespaceDetails {
result := make(InfoFileResultsPerShard, len(finder.Shards))
for _, shard := range finder.Shards {
result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(),
finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(),
persist.FileSetFlushType)
}

c.infoFilesByNamespace[finder.Namespace] = result
c.Lock()
defer c.Unlock()
if !c.hasPopulatedInfo {
c.populateInfoFilesByNamespaceWithLock()
c.hasPopulatedInfo = true
}
return c.infoFilesByNamespace
}

func (c *cache) populateInfoFilesByNamespaceWithLock() {
for _, finder := range c.namespaceDetails {
result := make(InfoFileResultsPerShard, len(finder.Shards))
for _, shard := range finder.Shards {
result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(),
finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(),
persist.FileSetFlushType)
}
})

return c.infoFilesByNamespace
c.infoFilesByNamespace[finder.Namespace] = result
}
}

type cacheOptions struct {
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/bootstrap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,9 @@ type Cache interface {
// ReadInfoFiles returns info file results for each shard grouped by namespace. A cached copy
// is returned if the info files have already been read.
ReadInfoFiles() InfoFilesByNamespace

// Evict cache contents by re-reading fresh data in.
Evict()
}

// CacheOptions represents the options for Cache.
Expand Down