diff --git a/src/dbnode/integration/large_tiles_test.go b/src/dbnode/integration/large_tiles_test.go index 0b28275f67..b988c6d5ec 100644 --- a/src/dbnode/integration/large_tiles_test.go +++ b/src/dbnode/integration/large_tiles_test.go @@ -24,7 +24,6 @@ package integration import ( "io" - "strconv" "testing" "time" @@ -35,11 +34,8 @@ import ( "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" xclock "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" - "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/assert" @@ -49,10 +45,8 @@ import ( ) var ( - blockSize = 2 * time.Hour - blockSizeT = 24 * time.Hour - indexBlockSize = 2 * blockSize - indexBlockSizeT = 2 * blockSizeT + blockSize = 2 * time.Hour + blockSizeT = 24 * time.Hour ) func TestReadAggregateWrite(t *testing.T) { @@ -74,21 +68,21 @@ func TestReadAggregateWrite(t *testing.T) { nowFn := testSetup.NowFn() // Write test data. - dpTimeStart := nowFn().Truncate(indexBlockSizeT).Add(-2 * indexBlockSizeT) + dpTimeStart := nowFn().Truncate(blockSizeT).Add(-blockSizeT) dpTime := dpTimeStart err = session.WriteTagged(srcNs.ID(), ident.StringID("aab"), ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1"), dpTime, 15, xtime.Second, nil) - testDataPointsCount := 60.0 - for a := 0.0; a < testDataPointsCount; a++ { + testDataPointsCount := 60 + for a := 0; a < testDataPointsCount; a++ { if a < 10 { dpTime = dpTime.Add(10 * time.Minute) continue } err = session.WriteTagged(srcNs.ID(), ident.StringID("foo"), ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1"), - dpTime, 42.1+a, xtime.Second, nil) + dpTime, 42.1+float64(a), xtime.Second, nil) require.NoError(t, err) dpTime = dpTime.Add(10 * time.Minute) } @@ -121,14 +115,6 @@ func TestReadAggregateWrite(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(10), processedTileCount) - require.True(t, xclock.WaitUntil(func() bool { - counters := reporter.Counters() - writeErrorsCount, _ := counters["database.writeAggData.errors"] - require.Equal(t, int64(0), writeErrorsCount) - seriesWritesCount, _ := counters["dbshard.large-tiles-writes"] - return seriesWritesCount >= 2 - }, time.Second*10)) - log.Info("validating aggregated data") expectedDps := []ts.Datapoint{ {Timestamp: dpTimeStart.Add(110 * time.Minute), Value: 53.1}, @@ -156,101 +142,6 @@ func TestReadAggregateWrite(t *testing.T) { expectedDps) } -var ( - iterationCount = 10 - testSeriesCount = 5000 - testDataPointsCount = int(blockSizeT.Hours()) * 100 -) - -//func TestAggregationAndQueryingAtHighConcurrency(t *testing.T) { -// testSetup, srcNs, trgNs, reporter, closer := setupServer(t) -// storageOpts := testSetup.StorageOpts() -// log := storageOpts.InstrumentOptions().Logger() -// -// // Stop the server. -// defer func() { -// require.NoError(t, testSetup.StopServer()) -// log.Debug("server is now down") -// testSetup.Close() -// _ = closer.Close() -// }() -// -// nowFn := testSetup.NowFn() -// dpTimeStart := nowFn().Truncate(indexBlockSizeT).Add(-2 * indexBlockSizeT) -// writeTestData(t, testSetup, log, reporter, dpTimeStart, srcNs.ID()) -// -// aggOpts, err := storage.NewAggregateTilesOptions( -// dpTimeStart, dpTimeStart.Add(blockSizeT), -// 10*time.Minute, false) -// require.NoError(t, err) -// -// log.Info("Starting aggregation loop") -// start := time.Now() -// -// inProgress := atomic.NewBool(true) -// var wg sync.WaitGroup -// for b := 0; b < 4; b++ { -// -// wg.Add(1) -// -// go func() { -// defer wg.Done() -// -// query := index.Query{ -// Query: idx.NewTermQuery([]byte("job"), []byte("job1"))} -// -// for inProgress.Load() { -// session, err := testSetup.M3DBClient().NewSession() -// require.NoError(t, err) -// result, _, err := session.FetchTagged(srcNs.ID(), query, -// index.QueryOptions{ -// StartInclusive: dpTimeStart.Add(-blockSizeT), -// EndExclusive: nowFn(), -// }) -// session.Close() -// if err != nil { -// require.NoError(t, err) -// return -// } -// require.Equal(t, testSeriesCount, len(result.Iters())) -// -// result.Close() -// time.Sleep(time.Millisecond) -// } -// }() -// } -// -// var expectedPoints = int64(testDataPointsCount * testSeriesCount / 100 * 6) -// for a := 0; a < iterationCount; a++ { -// ctx := storageOpts.ContextPool().Get() -// processedTileCount, err := testSetup.DB().AggregateTiles(ctx, srcNs.ID(), trgNs.ID(), aggOpts) -// ctx.BlockingClose() -// if err != nil { -// require.NoError(t, err) -// } -// require.Equal(t, processedTileCount, expectedPoints) -// } -// log.Info("Finished aggregation", zap.Duration("took", time.Since(start))) -// -// inProgress.Toggle() -// wg.Wait() -// log.Info("Finished parallel querying") -// -// counters := reporter.Counters() -// writeErrorsCount, _ := counters["database.writeAggData.errors"] -// require.Equal(t, int64(0), writeErrorsCount) -// seriesWritesCount, _ := counters["dbshard.large-tiles-writes"] -// require.Equal(t, int64(testSeriesCount*iterationCount), seriesWritesCount) -// -// session, err := testSetup.M3DBClient().NewSession() -// require.NoError(t, err) -// _, err = session.Fetch(srcNs.ID(), -// ident.StringID("foo"+strconv.Itoa(50)), -// dpTimeStart, dpTimeStart.Add(blockSizeT)) -// session.Close() -// require.NoError(t, err) -//} - func fetchAndValidate( t *testing.T, session client.Session, @@ -277,17 +168,18 @@ func fetchAndValidate( func setupServer(t *testing.T) (TestSetup, namespace.Metadata, namespace.Metadata, xmetrics.TestStatsReporter, io.Closer) { var ( rOpts = retention.NewOptions().SetRetentionPeriod(500 * blockSize).SetBlockSize(blockSize) - rOptsT = retention.NewOptions().SetRetentionPeriod(100 * blockSize).SetBlockSize(blockSizeT) - idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize) - idxOptsT = namespace.NewIndexOptions().SetEnabled(false).SetBlockSize(indexBlockSizeT) + rOptsT = retention.NewOptions().SetRetentionPeriod(100 * blockSize).SetBlockSize(blockSizeT).SetBufferPast(0) + idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(blockSize) + idxOptsT = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(blockSizeT) nsOpts = namespace.NewOptions(). SetRetentionOptions(rOpts). SetIndexOptions(idxOpts). SetColdWritesEnabled(true) nsOptsT = namespace.NewOptions(). SetRetentionOptions(rOptsT). - SetIndexOptions(idxOptsT). - SetColdWritesEnabled(true) + SetIndexOptions(idxOptsT) + + fixedNow = time.Now().Truncate(blockSizeT) ) srcNs, err := namespace.NewMetadata(testNamespaces[0], nsOpts) @@ -299,81 +191,21 @@ func setupServer(t *testing.T) (TestSetup, namespace.Metadata, namespace.Metadat SetNamespaces([]namespace.Metadata{srcNs, trgNs}). SetWriteNewSeriesAsync(true). SetNumShards(1). - SetFetchRequestTimeout(time.Second * 30) + SetFetchRequestTimeout(time.Second * 30). + SetNowFn(func() time.Time { + return fixedNow + }) testSetup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, testOpts) reporter := xmetrics.NewTestStatsReporter(xmetrics.NewTestStatsReporterOptions()) scope, closer := tally.NewRootScope( tally.ScopeOptions{Reporter: reporter}, time.Millisecond) - testSetup.SetStorageOpts(testSetup.StorageOpts().SetInstrumentOptions( - instrument.NewOptions().SetMetricsScope(scope))) + storageOpts := testSetup.StorageOpts(). + SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)) + testSetup.SetStorageOpts(storageOpts) // Start the server. require.NoError(t, testSetup.StartServer()) return testSetup, srcNs, trgNs, reporter, closer } - -func writeTestData( - t *testing.T, testSetup TestSetup, log *zap.Logger, - reporter xmetrics.TestStatsReporter, - dpTimeStart time.Time, ns ident.ID, -) { - dpTime := dpTimeStart - - testTagEncodingPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), - pool.NewObjectPoolOptions().SetSize(1)) - testTagEncodingPool.Init() - encoder := testTagEncodingPool.Get() - tagsIter := ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1") - err := encoder.Encode(tagsIter) - require.NoError(t, err) - - encodedTags, ok := encoder.Data() - require.True(t, ok) - encodedTagsBytes := encodedTags.Bytes() - - start := time.Now() - for a := 0; a < testDataPointsCount; a++ { - i := 0 - batchWriter, err := testSetup.DB().BatchWriter(ns, testDataPointsCount) - require.NoError(t, err) - - for b := 0; b < testSeriesCount; b++ { - tagsIter.Rewind() - err := batchWriter.AddTagged(i, - ident.StringID("foo"+strconv.Itoa(b)), - tagsIter, encodedTagsBytes, - dpTime, 42.1+float64(a), xtime.Second, nil) - require.NoError(t, err) - i++ - } - for r := 0; r < 3; r++ { - err = testSetup.DB().WriteTaggedBatch(context.NewContext(), ns, batchWriter, nil) - if err != nil && err.Error() == "commit log queue is full" { - time.Sleep(time.Second) - continue - } - break - } - require.NoError(t, err) - - dpTime = dpTime.Add(time.Minute) - } - - log.Info("test data written", zap.Duration("took", time.Since(start))) - - log.Info("waiting till data is cold flushed") - start = time.Now() - flushed := xclock.WaitUntil(func() bool { - counters := reporter.Counters() - flushes, _ := counters["database.flushIndex.success"] - writes, _ := counters["database.series.cold-writes"] - successFlushes, _ := counters["database.flushColdData.success"] - return flushes >= 1 && - int(writes) >= testDataPointsCount*testSeriesCount && - successFlushes >= 4 - }, time.Minute) - require.True(t, flushed) - log.Info("verified data has been cold flushed", zap.Duration("took", time.Since(start))) -} diff --git a/src/dbnode/server/options.go b/src/dbnode/server/options.go index 36dc8ccf79..38508ad948 100644 --- a/src/dbnode/server/options.go +++ b/src/dbnode/server/options.go @@ -30,4 +30,5 @@ type StorageOptions struct { OnColdFlush storage.OnColdFlush ForceColdWritesEnabled bool TChanNodeServerFn node.NewTChanNodeServerFn + NamespaceHooks storage.NamespaceHooks } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index c7a932e939..b265850f8e 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -826,6 +826,10 @@ func Run(runOpts RunOptions) { opts = opts.SetOnColdFlush(runOpts.StorageOptions.OnColdFlush) } + if runOpts.StorageOptions.NamespaceHooks != nil { + opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks) + } + // Set bootstrap options - We need to create a topology map provider from the // same topology that will be passed to the cluster so that when we make // bootstrapping decisions they are in sync with the clustered database diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 8695b55616..ff76b06123 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -369,6 +369,8 @@ func (d *db) logNamespaceUpdate(removes []ident.ID, adds, updates []namespace.Me } func (d *db) addNamespacesWithLock(namespaces []namespace.Metadata) error { + createdNamespaces := make([]databaseNamespace, 0, len(namespaces)) + for _, n := range namespaces { // ensure namespace doesn't exist _, ok := d.namespaces.Get(n.ID()) @@ -382,7 +384,17 @@ func (d *db) addNamespacesWithLock(namespaces []namespace.Metadata) error { return err } d.namespaces.Set(n.ID(), newNs) + createdNamespaces = append(createdNamespaces, newNs) } + + hooks := d.Options().NamespaceHooks() + for _, ns := range createdNamespaces { + err := hooks.OnCreatedNamespace(ns, d.namespaces.Get) + if err != nil { + return err + } + } + return nil } @@ -1047,7 +1059,7 @@ func (d *db) BootstrapState() DatabaseBootstrapState { d.RLock() for _, n := range d.namespaces.Iter() { ns := n.Value() - nsBootstrapStates[ns.ID().String()] = ns.BootstrapState() + nsBootstrapStates[ns.ID().String()] = ns.ShardBootstrapState() } d.RUnlock() diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 0305556188..af371a6211 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -785,7 +785,7 @@ func testDatabaseNamespaceIndexFunctions(t *testing.T, commitlogEnabled bool) { ns.EXPECT().OwnedShards().Return([]databaseShard{}).AnyTimes() ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes() + ns.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{}).AnyTimes() ns.EXPECT().Options().Return(nsOptions).AnyTimes() require.NoError(t, d.Open()) @@ -969,7 +969,7 @@ func testDatabaseWriteBatch(t *testing.T, ns.EXPECT().OwnedShards().Return([]databaseShard{}).AnyTimes() ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes() + ns.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{}).AnyTimes() ns.EXPECT().Options().Return(nsOptions).AnyTimes() ns.EXPECT().Close().Return(nil).Times(1) require.NoError(t, d.Open()) @@ -1115,11 +1115,11 @@ func TestDatabaseBootstrapState(t *testing.T) { }() ns1 := dbAddNewMockNamespace(ctrl, d, "testns1") - ns1.EXPECT().BootstrapState().Return(ShardBootstrapStates{ + ns1.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{ 1: Bootstrapping, }) ns2 := dbAddNewMockNamespace(ctrl, d, "testns2") - ns2.EXPECT().BootstrapState().Return(ShardBootstrapStates{ + ns2.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{ 2: Bootstrapped, }) @@ -1199,7 +1199,7 @@ func TestUpdateBatchWriterBasedOnShardResults(t *testing.T) { SetWritesToCommitLog(false) ns.EXPECT().OwnedShards().Return([]databaseShard{}).AnyTimes() ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes() + ns.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{}).AnyTimes() ns.EXPECT().Options().Return(nsOptions).AnyTimes() ns.EXPECT().Close().Return(nil).Times(1) require.NoError(t, d.Open()) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 52407b8ffc..b7ce9c52ef 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -31,7 +31,6 @@ import ( "sync" "time" - "github.com/m3db/bitset" "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" @@ -64,6 +63,7 @@ import ( xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" + "github.com/m3db/bitset" "github.com/opentracing/opentracing-go" opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" @@ -96,6 +96,8 @@ var ( type nsIndex struct { state nsIndexState + extendedRetentionPeriod time.Duration + // all the vars below this line are not modified past the ctor // and don't require a lock when being accessed. nowFn clock.NowFn @@ -652,7 +654,7 @@ func (i *nsIndex) writeBatches( blockSize = i.blockSize futureLimit = now.Add(1 * i.bufferFuture) pastLimit = now.Add(-1 * i.bufferPast) - earliestBlockStartToRetain = retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, now) + earliestBlockStartToRetain = i.earliestBlockStartToRetainWithLock(now) batchOptions = batch.Options() forwardIndexDice = i.forwardIndexDice forwardIndexEnabled = forwardIndexDice.enabled @@ -861,10 +863,7 @@ func (i *nsIndex) BootstrapsDone() uint { } func (i *nsIndex) Tick(c context.Cancellable, startTime time.Time) (namespaceIndexTickResult, error) { - var ( - result = namespaceIndexTickResult{} - earliestBlockStartToRetain = retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, startTime) - ) + var result namespaceIndexTickResult i.state.Lock() defer func() { @@ -872,6 +871,8 @@ func (i *nsIndex) Tick(c context.Cancellable, startTime time.Time) (namespaceInd i.state.Unlock() }() + earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(startTime) + result.NumBlocks = int64(len(i.state.blocksByTime)) var multiErr xerrors.MultiError @@ -1029,7 +1030,7 @@ func (i *nsIndex) flushableBlocks( flushable := make([]index.Block, 0, len(i.state.blocksByTime)) now := i.nowFn() - earliestBlockStartToRetain := retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, now) + earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(now) currentBlockStart := now.Truncate(i.blockSize) // Check for flushable blocks by iterating through all block starts w/in retention. for blockStart := earliestBlockStartToRetain; blockStart.Before(currentBlockStart); blockStart = blockStart.Add(i.blockSize) { @@ -1864,7 +1865,7 @@ func (i *nsIndex) CleanupExpiredFileSets(t time.Time) error { } // earliest block to retain based on retention period - earliestBlockStartToRetain := retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, t) + earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(t) // now we loop through the blocks we hold, to ensure we don't delete any data for them. for t := range i.state.blocksByTime { @@ -2073,6 +2074,26 @@ func (i *nsIndex) unableToAllocBlockInvariantError(err error) error { return ierr } +func (i *nsIndex) SetExtendedRetentionPeriod(period time.Duration) { + i.state.Lock() + defer i.state.Unlock() + + i.extendedRetentionPeriod = period +} + +func (i *nsIndex) effectiveRetentionPeriodWithLock() time.Duration { + period := i.retentionPeriod + if i.extendedRetentionPeriod > period { + period = i.extendedRetentionPeriod + } + + return period +} + +func (i *nsIndex) earliestBlockStartToRetainWithLock(t time.Time) time.Time { + return retention.FlushTimeStartForRetentionPeriod(i.effectiveRetentionPeriodWithLock(), i.blockSize, t) +} + type nsIndexMetrics struct { asyncInsertAttemptTotal tally.Counter asyncInsertAttemptSkip tally.Counter diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index 6a93d6e671..148fa603d1 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -368,6 +368,24 @@ func TestNamespaceIndexQueryNoMatchingBlocks(t *testing.T) { assert.Equal(t, 0, aggResult.Results.Size()) } +func TestNamespaceIndexSetExtendedRetentionPeriod(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + idx := newTestIndex(t, ctrl).index.(*nsIndex) + originalRetention := idx.retentionPeriod + + assert.Equal(t, originalRetention, idx.effectiveRetentionPeriodWithLock()) + + longerRetention := originalRetention + time.Minute + idx.SetExtendedRetentionPeriod(longerRetention) + assert.Equal(t, longerRetention, idx.effectiveRetentionPeriodWithLock()) + + shorterRetention := originalRetention - time.Minute + idx.SetExtendedRetentionPeriod(shorterRetention) + assert.Equal(t, originalRetention, idx.effectiveRetentionPeriodWithLock()) +} + func verifyFlushForShards( t *testing.T, ctrl *gomock.Controller, diff --git a/src/dbnode/storage/lease_verifier.go b/src/dbnode/storage/lease_verifier.go index 4f67092135..3cb2c8d526 100644 --- a/src/dbnode/storage/lease_verifier.go +++ b/src/dbnode/storage/lease_verifier.go @@ -73,7 +73,7 @@ func (v *leaseVerifier) VerifyLease( func (v *leaseVerifier) LatestState(descriptor block.LeaseDescriptor) (block.LeaseState, error) { flushState, err := v.flushStateRetriever.FlushState( - descriptor.Namespace, uint32(descriptor.Shard), descriptor.BlockStart) + descriptor.Namespace, descriptor.Shard, descriptor.BlockStart) if err != nil { return block.LeaseState{}, fmt.Errorf( "err retrieving flushState for LatestState, ns: %s, shard: %d, blockStart: %s, err: %v", diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 1322d6e46f..3269f96881 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1490,6 +1490,18 @@ func (n *dbNamespace) OwnedShards() []databaseShard { return databaseShards } +func (n *dbNamespace) SetIndex(reverseIndex NamespaceIndex) error { + n.Lock() + defer n.Unlock() + + if !n.metadata.Options().IndexOptions().Enabled() { + return errNamespaceIndexingDisabled + } + n.reverseIndex = reverseIndex + + return nil +} + func (n *dbNamespace) Index() (NamespaceIndex, error) { n.RLock() defer n.RUnlock() @@ -1574,7 +1586,13 @@ func (n *dbNamespace) Close() error { return nil } -func (n *dbNamespace) BootstrapState() ShardBootstrapStates { +func (n *dbNamespace) BootstrapState() BootstrapState { + n.RLock() + defer n.RUnlock() + return n.bootstrapState +} + +func (n *dbNamespace) ShardBootstrapState() ShardBootstrapStates { n.RLock() shardStates := make(ShardBootstrapStates, len(n.shards)) for _, shard := range n.shards { @@ -1631,11 +1649,11 @@ func (n *dbNamespace) aggregateTiles( opts.Start, opts.End, targetBlockSize.String()) } - n.RLock() - if n.bootstrapState != Bootstrapped { - n.RUnlock() + if n.BootstrapState() != Bootstrapped || sourceNs.BootstrapState() != Bootstrapped { return 0, errNamespaceNotBootstrapped } + + n.RLock() nsCtx := n.nsContextWithRLock() n.RUnlock() @@ -1671,7 +1689,8 @@ func (n *dbNamespace) aggregateTiles( sourceBlockVolumes = append(sourceBlockVolumes, shardBlockVolume{sourceBlockStart, latestVolume}) } - shardProcessedTileCount, err := targetShard.AggregateTiles(ctx, sourceNs.ID(), sourceShard.ID(), blockReaders, sourceBlockVolumes, opts, nsCtx.Schema) + shardProcessedTileCount, err := targetShard.AggregateTiles( + ctx, sourceNs.ID(), sourceShard.ID(), blockReaders, sourceBlockVolumes, opts, nsCtx.Schema) processedTileCount += shardProcessedTileCount if err != nil { @@ -1682,7 +1701,7 @@ func (n *dbNamespace) aggregateTiles( n.log.Info("finished large tiles aggregation for namespace", zap.String("sourceNs", sourceNs.ID().String()), zap.Int("shards", len(targetShards)), - zap.Int64("processedBlocks", processedTileCount), + zap.Int64("processedTiles", processedTileCount), zap.Duration("duration", time.Now().Sub(startedAt))) return processedTileCount, nil diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index c3c2283bb3..1647cda6c3 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -1256,6 +1256,16 @@ func TestNamespaceIndexDisabledQuery(t *testing.T) { } func TestNamespaceBootstrapState(t *testing.T) { + ns, closer := newTestNamespace(t) + defer closer() + + require.Equal(t, BootstrapNotStarted, ns.BootstrapState()) + + ns.bootstrapState = Bootstrapped + require.Equal(t, Bootstrapped, ns.BootstrapState()) +} + +func TestNamespaceShardBootstrapState(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1275,7 +1285,7 @@ func TestNamespaceBootstrapState(t *testing.T) { require.Equal(t, ShardBootstrapStates{ 0: Bootstrapped, 1: Bootstrapping, - }, ns.BootstrapState()) + }, ns.ShardBootstrapState()) } func TestNamespaceFlushState(t *testing.T) { @@ -1300,7 +1310,7 @@ func TestNamespaceFlushState(t *testing.T) { require.Equal(t, expectedFlushState, flushState) } -func TestNamespaceAggregateTilesFailOnBootstrapping(t *testing.T) { +func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") @@ -1314,10 +1324,14 @@ func TestNamespaceAggregateTilesFailOnBootstrapping(t *testing.T) { targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) defer targetCloser() - targetNs.bootstrapState = Bootstrapping _, err := targetNs.AggregateTiles(ctx, sourceNs, opts) require.Equal(t, errNamespaceNotBootstrapped, err) + + sourceNs.bootstrapState = Bootstrapped + + _, err = targetNs.AggregateTiles(ctx, sourceNs, opts) + require.Equal(t, errNamespaceNotBootstrapped, err) } func TestNamespaceAggregateTiles(t *testing.T) { @@ -1339,6 +1353,7 @@ func TestNamespaceAggregateTiles(t *testing.T) { sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) defer sourceCloser() + sourceNs.bootstrapState = Bootstrapped sourceRetentionOpts := sourceNs.nopts.RetentionOptions().SetBlockSize(sourceBlockSize) sourceNs.nopts = sourceNs.nopts.SetRetentionOptions(sourceRetentionOpts) diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 22e2bcfe3a..2fdcf9b0b8 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -166,6 +166,7 @@ type options struct { doNotIndexWithFieldsMap map[string]string namespaceRuntimeOptsMgrRegistry namespace.RuntimeOptionsManagerRegistry mediatorTickInterval time.Duration + namespaceHooks NamespaceHooks } // NewOptions creates a new set of storage options with defaults @@ -239,6 +240,7 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { memoryTracker: NewMemoryTracker(NewMemoryTrackerOptions(defaultNumLoadedBytesLimit)), namespaceRuntimeOptsMgrRegistry: namespace.NewRuntimeOptionsManagerRegistry(), mediatorTickInterval: defaultMediatorTickInterval, + namespaceHooks: &noopNamespaceHooks{}, } return o.SetEncodingM3TSZPooled() } @@ -813,8 +815,24 @@ func (o *options) MediatorTickInterval() time.Duration { return o.mediatorTickInterval } +func (o *options) SetNamespaceHooks(value NamespaceHooks) Options { + opts := *o + opts.namespaceHooks = value + return &opts +} + +func (o *options) NamespaceHooks() NamespaceHooks { + return o.namespaceHooks +} + type noOpColdFlush struct{} func (n *noOpColdFlush) ColdFlushNamespace(ns Namespace) (OnColdFlushNamespace, error) { return &persist.NoOpColdFlushNamespace{}, nil } + +type noopNamespaceHooks struct {} + +func (h *noopNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error { + return nil +} diff --git a/src/dbnode/storage/readonly_index_proxy.go b/src/dbnode/storage/readonly_index_proxy.go new file mode 100644 index 0000000000..02b420282f --- /dev/null +++ b/src/dbnode/storage/readonly_index_proxy.go @@ -0,0 +1,114 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "errors" + "time" + + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/sharding" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/ts/writes" + "github.com/m3db/m3/src/x/context" + xtime "github.com/m3db/m3/src/x/time" +) + +var errNamespaceIndexReadOnly = errors.New("write operation on read only namespace index") + +type readOnlyIndexProxy struct { + underlying NamespaceIndex +} + +func (r readOnlyIndexProxy) AssignShardSet(shardSet sharding.ShardSet) {} + +func (r readOnlyIndexProxy) BlockStartForWriteTime(writeTime time.Time) xtime.UnixNano { + return r.underlying.BlockStartForWriteTime(writeTime) +} + +func (r readOnlyIndexProxy) BlockForBlockStart(blockStart time.Time) (index.Block, error) { + return r.underlying.BlockForBlockStart(blockStart) +} + +func (r readOnlyIndexProxy) WriteBatch(batch *index.WriteBatch) error { + return errNamespaceIndexReadOnly +} + +func (r readOnlyIndexProxy) WritePending(pending []writes.PendingIndexInsert) error { + return errNamespaceIndexReadOnly +} + +func (r readOnlyIndexProxy) Query(ctx context.Context, query index.Query, opts index.QueryOptions) (index.QueryResult, error) { + return r.underlying.Query(ctx, query, opts) +} + +func (r readOnlyIndexProxy) AggregateQuery(ctx context.Context, query index.Query, opts index.AggregationOptions) (index.AggregateQueryResult, error) { + return r.underlying.AggregateQuery(ctx, query, opts) +} + +func (r readOnlyIndexProxy) Bootstrap(bootstrapResults result.IndexResults) error { + return nil +} + +func (r readOnlyIndexProxy) BootstrapsDone() uint { + return r.underlying.BootstrapsDone() +} + +func (r readOnlyIndexProxy) CleanupExpiredFileSets(t time.Time) error { + return nil +} + +func (r readOnlyIndexProxy) CleanupDuplicateFileSets() error { + return nil +} + +func (r readOnlyIndexProxy) Tick(c context.Cancellable, startTime time.Time) (namespaceIndexTickResult, error) { + return namespaceIndexTickResult{}, nil +} + +func (r readOnlyIndexProxy) WarmFlush(flush persist.IndexFlush, shards []databaseShard) error { + return nil +} + +func (r readOnlyIndexProxy) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) { + return noopOnColdFlushDone, nil +} + +func (r readOnlyIndexProxy) SetExtendedRetentionPeriod(period time.Duration) { + r.underlying.SetExtendedRetentionPeriod(period) +} + +func (r readOnlyIndexProxy) DebugMemorySegments(opts DebugMemorySegmentsOptions) error { + return r.underlying.DebugMemorySegments(opts) +} + +func (r readOnlyIndexProxy) Close() error { + return nil +} + +func NewReadOnlyIndexProxy(underlying NamespaceIndex) NamespaceIndex { + return readOnlyIndexProxy{underlying: underlying} +} + +func noopOnColdFlushDone() error { + return nil +} diff --git a/src/dbnode/storage/readonly_index_proxy_test.go b/src/dbnode/storage/readonly_index_proxy_test.go new file mode 100644 index 0000000000..9ddc66b9d5 --- /dev/null +++ b/src/dbnode/storage/readonly_index_proxy_test.go @@ -0,0 +1,120 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "errors" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/x/context" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestReadOnlyIndexProxyReject(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + idx := NewMockNamespaceIndex(ctrl) + roIdx := NewReadOnlyIndexProxy(idx) + + assert.Equal(t, errNamespaceIndexReadOnly, roIdx.WriteBatch(nil)) + assert.Equal(t, errNamespaceIndexReadOnly, roIdx.WritePending(nil)) +} + +func TestReadOnlyIndexProxySuppress(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + idx := NewMockNamespaceIndex(ctrl) + roIdx := NewReadOnlyIndexProxy(idx) + + roIdx.AssignShardSet(nil) + + assert.NoError(t, roIdx.Bootstrap(nil)) + + assert.NoError(t, roIdx.CleanupExpiredFileSets(time.Now())) + + assert.NoError(t, roIdx.CleanupDuplicateFileSets()) + + res, err := roIdx.Tick(nil, time.Now()) + assert.Equal(t, namespaceIndexTickResult{}, res) + assert.NoError(t, err) + + assert.NoError(t, roIdx.WarmFlush(nil, nil)) + + _, err = roIdx.ColdFlush(nil) + assert.NoError(t, err) + + assert.NoError(t, roIdx.Close()) +} + +func TestReadOnlyIndexProxyDelegate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + idx := NewMockNamespaceIndex(ctrl) + roIdx := NewReadOnlyIndexProxy(idx) + + now := time.Now().Truncate(time.Hour) + later := xtime.ToUnixNano(now.Add(time.Hour)) + testErr := errors.New("test error") + + idx.EXPECT().BlockStartForWriteTime(now).Return(later) + assert.Equal(t, later, roIdx.BlockStartForWriteTime(now)) + + block := index.NewMockBlock(ctrl) + idx.EXPECT().BlockForBlockStart(now).Return(block, testErr) + res, err := roIdx.BlockForBlockStart(now) + assert.Equal(t, testErr, err) + assert.Equal(t, block, res) + + ctx := context.NewContext() + query := index.Query{} + queryOpts := index.QueryOptions{} + queryRes := index.QueryResult{} + + idx.EXPECT().Query(ctx, query, queryOpts).Return(queryRes, testErr) + qRes, err := roIdx.Query(ctx, query, queryOpts) + assert.Equal(t, testErr, err) + assert.Equal(t, queryRes, qRes) + + aggOpts := index.AggregationOptions{} + aggRes := index.AggregateQueryResult{} + idx.EXPECT().AggregateQuery(ctx, query, aggOpts).Return(aggRes, testErr) + aRes, err := roIdx.AggregateQuery(ctx, query, aggOpts) + assert.Equal(t, testErr, err) + assert.Equal(t, aggRes, aRes) + + idx.EXPECT().BootstrapsDone().Return(uint(3)) + assert.Equal(t, uint(3), roIdx.BootstrapsDone()) + + idx.EXPECT().SetExtendedRetentionPeriod(time.Minute) + roIdx.SetExtendedRetentionPeriod(time.Minute) + + debugOpts := DebugMemorySegmentsOptions{} + idx.EXPECT().DebugMemorySegments(debugOpts).Return(testErr) + assert.Equal(t, testErr, roIdx.DebugMemorySegments(debugOpts)) +} diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 7410ed8212..95a051468f 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2646,9 +2646,9 @@ func (s *dbShard) AggregateTiles( return 0, fmt.Errorf("blockReaders and sourceBlockVolumes length mismatch (%d != %d)", len(blockReaders), len(sourceBlockVolumes)) } - blockReadersToClose := make([]fs.DataFileSetReader, 0, len(blockReaders)) + openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders)) defer func() { - for _, reader := range blockReadersToClose { + for _, reader := range openBlockReaders { reader.Close() } }() @@ -2668,17 +2668,24 @@ func (s *dbShard) AggregateTiles( } if err := blockReader.Open(openOpts); err != nil { - s.logger.Error("blockReader.Open", zap.Error(err)) + if err == fs.ErrCheckpointFileNotFound { + // A very recent source block might not have been flushed yet. + continue + } + s.logger.Error("blockReader.Open", + zap.Error(err), + zap.Time("blockStart", sourceBlockVolume.blockStart), + zap.Int("volumeIndex", sourceBlockVolume.latestVolume)) return 0, err } if blockReader.Entries() > maxEntries { maxEntries = blockReader.Entries() } - blockReadersToClose = append(blockReadersToClose, blockReader) + openBlockReaders = append(openBlockReaders, blockReader) } - crossBlockReader, err := fs.NewCrossBlockReader(blockReaders, s.opts.InstrumentOptions()) + crossBlockReader, err := fs.NewCrossBlockReader(openBlockReaders, s.opts.InstrumentOptions()) if err != nil { s.logger.Error("NewCrossBlockReader", zap.Error(err)) return 0, err @@ -2844,7 +2851,7 @@ READER: s.logger.Debug("finished aggregating tiles", zap.Uint32("shard", s.ID()), - zap.Int64("processedBlocks", processedTileCount.Load())) + zap.Int64("processedTiles", processedTileCount.Load())) return processedTileCount.Load(), nil } diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 76c44c3bf1..00c9507ec3 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1782,9 +1782,6 @@ func TestShardAggregateTiles(t *testing.T) { targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true) defer targetShard.Close() - reverseIndex := NewMockNamespaceIndex(ctrl) - targetShard.reverseIndex = reverseIndex - sourceNsID := sourceShard.namespace.ID() dataBytes := func() []byte { @@ -1799,22 +1796,26 @@ func TestShardAggregateTiles(t *testing.T) { return encodedBytes } - reader0, volume0 := getMockReader(ctrl, t, sourceShard, start) + reader0, volume0 := getMockReader(ctrl, t, sourceShard, start, true) reader0.EXPECT().Entries().Return(2).AnyTimes() reader0.EXPECT().StreamingRead().Return(ident.BytesID("id1"), nil, dataBytes(), uint32(11), nil) reader0.EXPECT().StreamingRead().Return(ident.BytesID("id2"), nil, dataBytes(), uint32(22), nil) reader0.EXPECT().StreamingRead().Return(nil, nil, nil, uint32(0), io.EOF) secondSourceBlockStart := start.Add(sourceBlockSize) - reader1, volume1 := getMockReader(ctrl, t, sourceShard, secondSourceBlockStart) + reader1, volume1 := getMockReader(ctrl, t, sourceShard, secondSourceBlockStart, true) reader1.EXPECT().Entries().Return(1).AnyTimes() reader1.EXPECT().StreamingRead().Return(ident.BytesID("id3"), nil, dataBytes(), uint32(33), nil) reader1.EXPECT().StreamingRead().Return(nil, nil, nil, uint32(0), io.EOF) - blockReaders := []fs.DataFileSetReader{reader0, reader1} + thirdSourceBlockStart := secondSourceBlockStart.Add(sourceBlockSize) + reader2, volume2 := getMockReader(ctrl, t, sourceShard, thirdSourceBlockStart, false) + + blockReaders := []fs.DataFileSetReader{reader0, reader1, reader2} sourceBlockVolumes := []shardBlockVolume{ {start, volume0}, {secondSourceBlockStart, volume1}, + {thirdSourceBlockStart, volume2}, } processedTileCount, err := targetShard.AggregateTiles( @@ -1823,11 +1824,30 @@ func TestShardAggregateTiles(t *testing.T) { assert.Equal(t, int64(3), processedTileCount) } +func TestShardAggregateTilesVerifySliceLengths(t *testing.T) { + var ( + ctx = context.NewContext() + srcNsID = ident.StringID("src") + start = time.Now() + ) + + targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true) + defer targetShard.Close() + + var blockReaders []fs.DataFileSetReader + sourceBlockVolumes := []shardBlockVolume{{start, 0}} + + _, err := targetShard.AggregateTiles( + ctx, srcNsID, 1, blockReaders, sourceBlockVolumes, AggregateTilesOptions{}, nil) + require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)") +} + func getMockReader( ctrl *gomock.Controller, t *testing.T, shard *dbShard, blockStart time.Time, + dataFilesetFlushed bool, ) (*fs.MockDataFileSetReader, int) { latestSourceVolume, err := shard.LatestVolume(blockStart) require.NoError(t, err) @@ -1844,10 +1864,14 @@ func getMockReader( } reader := fs.NewMockDataFileSetReader(ctrl) - reader.EXPECT().Open(openOpts).Return(nil) - reader.EXPECT().StreamingEnabled().Return(true) - reader.EXPECT().Range().Return(xtime.Range{Start: blockStart}) - reader.EXPECT().Close() + if dataFilesetFlushed { + reader.EXPECT().Open(openOpts).Return(nil) + reader.EXPECT().StreamingEnabled().Return(true) + reader.EXPECT().Range().Return(xtime.Range{Start: blockStart}) + reader.EXPECT().Close() + } else { + reader.EXPECT().Open(openOpts).Return(fs.ErrCheckpointFileNotFound) + } return reader, latestSourceVolume } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index a67b6243aa..0c5a146517 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1049,6 +1049,20 @@ func (mr *MockNamespaceMockRecorder) Shards() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shards", reflect.TypeOf((*MockNamespace)(nil).Shards)) } +// SetIndex mocks base method +func (m *MockNamespace) SetIndex(reverseIndex NamespaceIndex) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetIndex", reverseIndex) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetIndex indicates an expected call of SetIndex +func (mr *MockNamespaceMockRecorder) SetIndex(reverseIndex interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIndex", reflect.TypeOf((*MockNamespace)(nil).SetIndex), reverseIndex) +} + // Index mocks base method func (m *MockNamespace) Index() (NamespaceIndex, error) { m.ctrl.T.Helper() @@ -1185,6 +1199,20 @@ func (mr *MockdatabaseNamespaceMockRecorder) Shards() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shards", reflect.TypeOf((*MockdatabaseNamespace)(nil).Shards)) } +// SetIndex mocks base method +func (m *MockdatabaseNamespace) SetIndex(reverseIndex NamespaceIndex) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetIndex", reverseIndex) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetIndex indicates an expected call of SetIndex +func (mr *MockdatabaseNamespaceMockRecorder) SetIndex(reverseIndex interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIndex", reflect.TypeOf((*MockdatabaseNamespace)(nil).SetIndex), reverseIndex) +} + // Index mocks base method func (m *MockdatabaseNamespace) Index() (NamespaceIndex, error) { m.ctrl.T.Helper() @@ -1504,10 +1532,10 @@ func (mr *MockdatabaseNamespaceMockRecorder) Repair(repairer, tr interface{}) *g } // BootstrapState mocks base method -func (m *MockdatabaseNamespace) BootstrapState() ShardBootstrapStates { +func (m *MockdatabaseNamespace) BootstrapState() BootstrapState { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "BootstrapState") - ret0, _ := ret[0].(ShardBootstrapStates) + ret0, _ := ret[0].(BootstrapState) return ret0 } @@ -1517,6 +1545,20 @@ func (mr *MockdatabaseNamespaceMockRecorder) BootstrapState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapState", reflect.TypeOf((*MockdatabaseNamespace)(nil).BootstrapState)) } +// ShardBootstrapState mocks base method +func (m *MockdatabaseNamespace) ShardBootstrapState() ShardBootstrapStates { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShardBootstrapState") + ret0, _ := ret[0].(ShardBootstrapStates) + return ret0 +} + +// ShardBootstrapState indicates an expected call of ShardBootstrapState +func (mr *MockdatabaseNamespaceMockRecorder) ShardBootstrapState() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShardBootstrapState", reflect.TypeOf((*MockdatabaseNamespace)(nil).ShardBootstrapState)) +} + // FlushState mocks base method func (m *MockdatabaseNamespace) FlushState(shardID uint32, blockStart time.Time) (fileOpState, error) { m.ctrl.T.Helper() @@ -2359,6 +2401,18 @@ func (mr *MockNamespaceIndexMockRecorder) ColdFlush(shards interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlush", reflect.TypeOf((*MockNamespaceIndex)(nil).ColdFlush), shards) } +// SetExtendedRetentionPeriod mocks base method +func (m *MockNamespaceIndex) SetExtendedRetentionPeriod(period time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetExtendedRetentionPeriod", period) +} + +// SetExtendedRetentionPeriod indicates an expected call of SetExtendedRetentionPeriod +func (mr *MockNamespaceIndexMockRecorder) SetExtendedRetentionPeriod(period interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetExtendedRetentionPeriod", reflect.TypeOf((*MockNamespaceIndex)(nil).SetExtendedRetentionPeriod), period) +} + // DebugMemorySegments mocks base method func (m *MockNamespaceIndex) DebugMemorySegments(opts DebugMemorySegmentsOptions) error { m.ctrl.T.Helper() @@ -4565,6 +4619,34 @@ func (mr *MockOptionsMockRecorder) MediatorTickInterval() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MediatorTickInterval", reflect.TypeOf((*MockOptions)(nil).MediatorTickInterval)) } +// SetNamespaceHooks mocks base method +func (m *MockOptions) SetNamespaceHooks(hooks NamespaceHooks) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetNamespaceHooks", hooks) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetNamespaceHooks indicates an expected call of SetNamespaceHooks +func (mr *MockOptionsMockRecorder) SetNamespaceHooks(hooks interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetNamespaceHooks", reflect.TypeOf((*MockOptions)(nil).SetNamespaceHooks), hooks) +} + +// NamespaceHooks mocks base method +func (m *MockOptions) NamespaceHooks() NamespaceHooks { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NamespaceHooks") + ret0, _ := ret[0].(NamespaceHooks) + return ret0 +} + +// NamespaceHooks indicates an expected call of NamespaceHooks +func (mr *MockOptionsMockRecorder) NamespaceHooks() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NamespaceHooks", reflect.TypeOf((*MockOptions)(nil).NamespaceHooks)) +} + // MockMemoryTracker is a mock of MemoryTracker interface type MockMemoryTracker struct { ctrl *gomock.Controller @@ -4651,3 +4733,40 @@ func (mr *MockMemoryTrackerMockRecorder) WaitForDec() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForDec", reflect.TypeOf((*MockMemoryTracker)(nil).WaitForDec)) } + +// MockNamespaceHooks is a mock of NamespaceHooks interface +type MockNamespaceHooks struct { + ctrl *gomock.Controller + recorder *MockNamespaceHooksMockRecorder +} + +// MockNamespaceHooksMockRecorder is the mock recorder for MockNamespaceHooks +type MockNamespaceHooksMockRecorder struct { + mock *MockNamespaceHooks +} + +// NewMockNamespaceHooks creates a new mock instance +func NewMockNamespaceHooks(ctrl *gomock.Controller) *MockNamespaceHooks { + mock := &MockNamespaceHooks{ctrl: ctrl} + mock.recorder = &MockNamespaceHooksMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockNamespaceHooks) EXPECT() *MockNamespaceHooksMockRecorder { + return m.recorder +} + +// OnCreatedNamespace mocks base method +func (m *MockNamespaceHooks) OnCreatedNamespace(arg0 Namespace, arg1 GetNamespaceFn) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OnCreatedNamespace", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// OnCreatedNamespace indicates an expected call of OnCreatedNamespace +func (mr *MockNamespaceHooksMockRecorder) OnCreatedNamespace(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnCreatedNamespace", reflect.TypeOf((*MockNamespaceHooks)(nil).OnCreatedNamespace), arg0, arg1) +} diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 1a2df64bb7..044f133286 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -255,6 +255,9 @@ type Namespace interface { // Shards returns the shard description. Shards() []Shard + // SetIndex sets and enables reverse index for this namespace. + SetIndex(reverseIndex NamespaceIndex) error + // Index returns the reverse index backing the namespace, if it exists. Index() (NamespaceIndex, error) @@ -390,9 +393,11 @@ type databaseNamespace interface { // Repair repairs the namespace data for a given time range Repair(repairer databaseShardRepairer, tr xtime.Range) error - // BootstrapState captures and returns a snapshot of the namespaces' - // bootstrap state. - BootstrapState() ShardBootstrapStates + // BootstrapState returns namespaces' bootstrap state. + BootstrapState() BootstrapState + + // ShardBootstrapState captures and returns a snapshot of the namespaces' shards bootstrap state. + ShardBootstrapState() ShardBootstrapStates // FlushState returns the flush state for the specified shard and block start. FlushState(shardID uint32, blockStart time.Time) (fileOpState, error) @@ -698,6 +703,9 @@ type NamespaceIndex interface { // cold flushing completes to perform houskeeping. ColdFlush(shards []databaseShard) (OnColdFlushDone, error) + // SetExtendedRetentionPeriod allows to extend index retention beyond the retention of the namespace it belongs to. + SetExtendedRetentionPeriod(period time.Duration) + // DebugMemorySegments allows for debugging memory segments. DebugMemorySegments(opts DebugMemorySegmentsOptions) error @@ -1212,6 +1220,12 @@ type Options interface { // MediatorTickInterval returns the ticking interval for the mediator. MediatorTickInterval() time.Duration + + // SetNamespaceHooks sets the NamespaceHooks. + SetNamespaceHooks(hooks NamespaceHooks) Options + + // NamespaceHooks returns the NamespaceHooks. + NamespaceHooks() NamespaceHooks } // MemoryTracker tracks memory. @@ -1281,3 +1295,11 @@ type AggregateTilesOptions struct { // TODO: remove once we have metrics type stored in the metadata. HandleCounterResets bool } + +// NamespaceHooks allows dynamic plugging into the namespace lifecycle. +type NamespaceHooks interface { + // OnCreatedNamespace gets invoked after each namespace is created. + OnCreatedNamespace(Namespace, GetNamespaceFn) error +} + +type GetNamespaceFn func (k ident.ID) (databaseNamespace, bool)