diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 1051301a69..9b824e7c79 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -115,22 +115,106 @@ type dataPersistManager struct { snapshotID uuid.UUID } -type indexPersistManager struct { - writer IndexFileSetWriter - segmentWriter m3ninxpersist.MutableSegmentFileSetWriter - +type singleUseIndexWriterState struct { // identifiers required to know which file to open // after persistence is over fileSetIdentifier FileSetFileIdentifier fileSetType persist.FileSetType - // track state of writers - writeErr error - initialized bool + // track state of writer + writeErr error +} + +// Support writing to multiple index blocks/filesets during index persist. +// This allows us to prepare an index fileset writer per block start. +type singleUseIndexWriter struct { + // back-ref to the index persist manager so we can share resources there + manager *indexPersistManager + writer IndexFileSetWriter + + state singleUseIndexWriterState +} + +func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { + // Lock the index persist manager as we're sharing the segment builder as a resource. + s.manager.Lock() + defer s.manager.Unlock() + + markError := func(err error) { + s.state.writeErr = err + } + if err := s.state.writeErr; err != nil { + return fmt.Errorf("encountered error: %w, skipping further attempts to persist data", err) + } + + if err := s.manager.segmentWriter.Reset(builder); err != nil { + markError(err) + return err + } + + if err := s.writer.WriteSegmentFileSet(s.manager.segmentWriter); err != nil { + markError(err) + return err + } + + return nil +} + +func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { + s.manager.Lock() + defer s.manager.Unlock() + + // This writer will be thrown away after we're done persisting. + defer func() { + s.state = singleUseIndexWriterState{fileSetType: -1} + s.manager = nil + s.writer = nil + }() + + // s.e. we're done writing all segments for PreparedIndexPersist. + // so we can close the writer. + if err := s.writer.Close(); err != nil { + return nil, err + } + + // only attempt to retrieve data if we have not encountered errors during + // any writes. + if err := s.state.writeErr; err != nil { + return nil, err + } + + // and then we get persistent segments backed by mmap'd data so the index + // can safely evict the segment's we have just persisted. + result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ + ReaderOptions: IndexReaderOpenOptions{ + Identifier: s.state.fileSetIdentifier, + FileSetType: s.state.fileSetType, + }, + FilesystemOptions: s.manager.opts, + newReaderFn: s.manager.newReaderFn, + newPersistentSegmentFn: s.manager.newPersistentSegmentFn, + }) + if err != nil { + return nil, err + } + + return result.Segments, nil +} + +type indexPersistManager struct { + sync.Mutex + + // segmentWriter holds the bulk of the re-usable in-mem resources so + // we want to share this across writers. + segmentWriter m3ninxpersist.MutableSegmentFileSetWriter // hooks used for testing newReaderFn newIndexReaderFn newPersistentSegmentFn newPersistentSegmentFn + newIndexWriterFn newIndexWriterFn + + // options used by index writers + opts Options } type newIndexReaderFn func(Options) (IndexFileSetReader, error) @@ -140,6 +224,8 @@ type newPersistentSegmentFn func( m3ninxfs.Options, ) (m3ninxfs.Segment, error) +type newIndexWriterFn func(Options) (IndexFileSetWriter, error) + type persistManagerMetrics struct { writeDurationMs tally.Gauge throttleDurationMs tally.Gauge @@ -163,11 +249,6 @@ func NewPersistManager(opts Options) (persist.Manager, error) { return nil, err } - idxWriter, err := NewIndexWriter(opts) - if err != nil { - return nil, err - } - segmentWriter, err := m3ninxpersist.NewMutableSegmentFileSetWriter( opts.FSTWriterOptions()) if err != nil { @@ -186,30 +267,31 @@ func NewPersistManager(opts Options) (persist.Manager, error) { snapshotMetadataWriter: NewSnapshotMetadataWriter(opts), }, indexPM: indexPersistManager{ - writer: idxWriter, segmentWriter: segmentWriter, + // fs opts are used by underlying index writers + opts: opts, }, status: persistManagerIdle, metrics: newPersistManagerMetrics(scope), } pm.indexPM.newReaderFn = NewIndexReader pm.indexPM.newPersistentSegmentFn = m3ninxpersist.NewSegment + pm.indexPM.newIndexWriterFn = NewIndexWriter pm.runtimeOptsListener = opts.RuntimeOptionsManager().RegisterListener(pm) return pm, nil } -func (pm *persistManager) reset() { +func (pm *persistManager) resetWithLock() error { pm.status = persistManagerIdle pm.start = timeZero pm.count = 0 pm.bytesWritten = 0 pm.worked = 0 pm.slept = 0 - pm.indexPM.segmentWriter.Reset(nil) - pm.indexPM.writeErr = nil - pm.indexPM.initialized = false pm.dataPM.snapshotID = nil + + return pm.indexPM.segmentWriter.Reset(nil) } // StartIndexPersist is called by the databaseFlushManager to begin the persist process for @@ -271,83 +353,32 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis IndexVolumeType: opts.IndexVolumeType, } + writer, err := pm.indexPM.newIndexWriterFn(pm.opts) + if err != nil { + return prepared, err + } + idxWriter := &singleUseIndexWriter{ + manager: &pm.indexPM, + writer: writer, + state: singleUseIndexWriterState{ + // track which file we are writing in the persist manager, so we + // know which file to read back on `closeIndex` being called. + fileSetIdentifier: fileSetID, + fileSetType: opts.FileSetType, + }, + } // create writer for required fileset file. - if err := pm.indexPM.writer.Open(idxWriterOpts); err != nil { + if err := idxWriter.writer.Open(idxWriterOpts); err != nil { return prepared, err } - // track which file we are writing in the persist manager, so we - // know which file to read back on `closeIndex` being called. - pm.indexPM.fileSetIdentifier = fileSetID - pm.indexPM.fileSetType = opts.FileSetType - pm.indexPM.initialized = true - // provide persistManager hooks into PreparedIndexPersist object - prepared.Persist = pm.persistIndex - prepared.Close = pm.closeIndex + prepared.Persist = idxWriter.persistIndex + prepared.Close = idxWriter.closeIndex return prepared, nil } -func (pm *persistManager) persistIndex(builder segment.Builder) error { - // FOLLOWUP(prateek): need to use-rate limiting runtime options in this code path - markError := func(err error) { - pm.indexPM.writeErr = err - } - if err := pm.indexPM.writeErr; err != nil { - return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err) - } - - if err := pm.indexPM.segmentWriter.Reset(builder); err != nil { - markError(err) - return err - } - - if err := pm.indexPM.writer.WriteSegmentFileSet(pm.indexPM.segmentWriter); err != nil { - markError(err) - return err - } - - return nil -} - -func (pm *persistManager) closeIndex() ([]segment.Segment, error) { - // ensure StartIndexPersist was called - if !pm.indexPM.initialized { - return nil, errPersistManagerNotPersisting - } - pm.indexPM.initialized = false - - // i.e. we're done writing all segments for PreparedIndexPersist. - // so we can close the writer. - if err := pm.indexPM.writer.Close(); err != nil { - return nil, err - } - - // only attempt to retrieve data if we have not encountered errors during - // any writes. - if err := pm.indexPM.writeErr; err != nil { - return nil, err - } - - // and then we get persistent segments backed by mmap'd data so the index - // can safely evict the segment's we have just persisted. - result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ - ReaderOptions: IndexReaderOpenOptions{ - Identifier: pm.indexPM.fileSetIdentifier, - FileSetType: pm.indexPM.fileSetType, - }, - FilesystemOptions: pm.opts, - newReaderFn: pm.indexPM.newReaderFn, - newPersistentSegmentFn: pm.indexPM.newPersistentSegmentFn, - }) - if err != nil { - return nil, err - } - - return result.Segments, nil -} - // DoneIndex is called by the databaseFlushManager to finish the index persist process. func (pm *persistManager) DoneIndex() error { pm.Lock() @@ -362,9 +393,7 @@ func (pm *persistManager) DoneIndex() error { pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) // Reset state - pm.reset() - - return nil + return pm.resetWithLock() } // StartFlushPersist is called by the databaseFlushManager to begin the persist process. @@ -558,7 +587,7 @@ func (pm *persistManager) DoneFlush() error { return errPersistManagerCannotDoneFlushNotFlush } - return pm.doneShared() + return pm.doneSharedWithLock() } // DoneSnapshot is called by the databaseFlushManager to finish the snapshot persist process. @@ -594,7 +623,7 @@ func (pm *persistManager) DoneSnapshot( return fmt.Errorf("error writing out snapshot metadata file: %v", err) } - return pm.doneShared() + return pm.doneSharedWithLock() } // Close all resources. @@ -602,15 +631,13 @@ func (pm *persistManager) Close() { pm.runtimeOptsListener.Close() } -func (pm *persistManager) doneShared() error { +func (pm *persistManager) doneSharedWithLock() error { // Emit timing metrics pm.metrics.writeDurationMs.Update(float64(pm.worked / time.Millisecond)) pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) // Reset state - pm.reset() - - return nil + return pm.resetWithLock() } func (pm *persistManager) dataFilesetExists(prepareOpts persist.DataPrepareOptions) (bool, error) { diff --git a/src/dbnode/persist/fs/persist_manager_test.go b/src/dbnode/persist/fs/persist_manager_test.go index cd0f39386d..362db4b5eb 100644 --- a/src/dbnode/persist/fs/persist_manager_test.go +++ b/src/dbnode/persist/fs/persist_manager_test.go @@ -313,15 +313,6 @@ func TestPersistenceManagerCloseData(t *testing.T) { pm.closeData() } -func TestPersistenceManagerCloseIndex(t *testing.T) { - ctrl := gomock.NewController(xtest.Reporter{T: t}) - defer ctrl.Finish() - - pm, _, _, _ := testIndexPersistManager(t, ctrl) - defer os.RemoveAll(pm.filePathPrefix) - pm.closeIndex() -} - func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() @@ -421,17 +412,6 @@ func TestPersistenceManagerPrepareIndexSuccess(t *testing.T) { pm, writer, segWriter, _ := testIndexPersistManager(t, ctrl) defer os.RemoveAll(pm.filePathPrefix) - blockStart := time.Unix(1000, 0) - writerOpts := IndexWriterOpenOptions{ - Identifier: FileSetFileIdentifier{ - FileSetContentType: persist.FileSetIndexContentType, - Namespace: testNs1ID, - BlockStart: blockStart, - }, - BlockSize: testBlockSize, - } - writer.EXPECT().Open(xtest.CmpMatcher(writerOpts, m3test.IdentTransformer)).Return(nil) - flush, err := pm.StartIndexPersist() require.NoError(t, err) @@ -440,46 +420,62 @@ func TestPersistenceManagerPrepareIndexSuccess(t *testing.T) { assert.NoError(t, flush.DoneIndex()) }() - prepareOpts := persist.IndexPrepareOptions{ - NamespaceMetadata: testNs1Metadata(t), - BlockStart: blockStart, - } - prepared, err := flush.PrepareIndex(prepareOpts) - require.NoError(t, err) + // We support preparing multiple index block writers for an index persist. + numBlocks := 10 + blockStart := time.Unix(1000, 0) + for i := 1; i < numBlocks; i++ { + blockStart = blockStart.Add(time.Duration(i) * testBlockSize) + writerOpts := IndexWriterOpenOptions{ + Identifier: FileSetFileIdentifier{ + FileSetContentType: persist.FileSetIndexContentType, + Namespace: testNs1ID, + BlockStart: blockStart, + }, + BlockSize: testBlockSize, + } + writer.EXPECT().Open(xtest.CmpMatcher(writerOpts, m3test.IdentTransformer)).Return(nil) - seg := segment.NewMockMutableSegment(ctrl) - segWriter.EXPECT().Reset(seg).Return(nil) - writer.EXPECT().WriteSegmentFileSet(segWriter).Return(nil) - require.NoError(t, prepared.Persist(seg)) + prepareOpts := persist.IndexPrepareOptions{ + NamespaceMetadata: testNs1Metadata(t), + BlockStart: blockStart, + } + prepared, err := flush.PrepareIndex(prepareOpts) + require.NoError(t, err) - reader := NewMockIndexFileSetReader(ctrl) - pm.indexPM.newReaderFn = func(Options) (IndexFileSetReader, error) { - return reader, nil - } + seg := segment.NewMockMutableSegment(ctrl) + segWriter.EXPECT().Reset(seg).Return(nil) + writer.EXPECT().WriteSegmentFileSet(segWriter).Return(nil) + require.NoError(t, prepared.Persist(seg)) - reader.EXPECT().Open(xtest.CmpMatcher(IndexReaderOpenOptions{ - Identifier: writerOpts.Identifier, - }, m3test.IdentTransformer)).Return(IndexReaderOpenResult{}, nil) + reader := NewMockIndexFileSetReader(ctrl) + pm.indexPM.newReaderFn = func(Options) (IndexFileSetReader, error) { + return reader, nil + } - file := NewMockIndexSegmentFile(ctrl) - gomock.InOrder( - reader.EXPECT().SegmentFileSets().Return(1), - reader.EXPECT().ReadSegmentFileSet().Return(file, nil), - reader.EXPECT().ReadSegmentFileSet().Return(nil, io.EOF), - ) - fsSeg := m3ninxfs.NewMockSegment(ctrl) - pm.indexPM.newPersistentSegmentFn = func( - fset m3ninxpersist.IndexSegmentFileSet, opts m3ninxfs.Options, - ) (m3ninxfs.Segment, error) { - require.Equal(t, file, fset) - return fsSeg, nil - } + reader.EXPECT().Open(xtest.CmpMatcher(IndexReaderOpenOptions{ + Identifier: writerOpts.Identifier, + }, m3test.IdentTransformer)).Return(IndexReaderOpenResult{}, nil) + + file := NewMockIndexSegmentFile(ctrl) + gomock.InOrder( + reader.EXPECT().SegmentFileSets().Return(1), + reader.EXPECT().ReadSegmentFileSet().Return(file, nil), + reader.EXPECT().ReadSegmentFileSet().Return(nil, io.EOF), + ) + fsSeg := m3ninxfs.NewMockSegment(ctrl) + pm.indexPM.newPersistentSegmentFn = func( + fset m3ninxpersist.IndexSegmentFileSet, opts m3ninxfs.Options, + ) (m3ninxfs.Segment, error) { + require.Equal(t, file, fset) + return fsSeg, nil + } - writer.EXPECT().Close().Return(nil) - segs, err := prepared.Close() - require.NoError(t, err) - require.Len(t, segs, 1) - require.Equal(t, fsSeg, segs[0]) + writer.EXPECT().Close().Return(nil) + segs, err := prepared.Close() + require.NoError(t, err) + require.Len(t, segs, 1) + require.Equal(t, fsSeg, segs[0]) + } } func TestPersistenceManagerNoRateLimit(t *testing.T) { @@ -766,7 +762,9 @@ func testIndexPersistManager(t *testing.T, ctrl *gomock.Controller, require.NoError(t, err) manager := mgr.(*persistManager) - manager.indexPM.writer = writer + manager.indexPM.newIndexWriterFn = func(opts Options) (IndexFileSetWriter, error) { + return writer, nil + } manager.indexPM.segmentWriter = segmentWriter return manager, writer, segmentWriter, opts } diff --git a/src/dbnode/persist/persist_mock.go b/src/dbnode/persist/persist_mock.go index 1ec52dcd02..351250cf06 100644 --- a/src/dbnode/persist/persist_mock.go +++ b/src/dbnode/persist/persist_mock.go @@ -341,3 +341,17 @@ func (mr *MockOnFlushSeriesMockRecorder) OnFlushNewSeries(arg0 interface{}) *gom mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFlushNewSeries", reflect.TypeOf((*MockOnFlushSeries)(nil).OnFlushNewSeries), arg0) } + +// CheckpointAndMaybeCompact mocks base method +func (m *MockOnFlushSeries) CheckpointAndMaybeCompact() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckpointAndMaybeCompact") + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckpointAndMaybeCompact indicates an expected call of CheckpointAndMaybeCompact +func (mr *MockOnFlushSeriesMockRecorder) CheckpointAndMaybeCompact() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckpointAndMaybeCompact", reflect.TypeOf((*MockOnFlushSeries)(nil).CheckpointAndMaybeCompact)) +} diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index 4b4b195b53..25a8458f2d 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -366,13 +366,21 @@ type OnFlushNewSeriesEvent struct { } // OnFlushSeries performs work on a per series level. +// Also exposes a checkpoint fn for maybe compacting multiple index segments based on size. type OnFlushSeries interface { OnFlushNewSeries(OnFlushNewSeriesEvent) error + + // CheckpointAndMaybeCompact checks to see if we're at maximum cardinality + // for any index segments we're currently building and compact if we are. + CheckpointAndMaybeCompact() error } // NoOpColdFlushNamespace is a no-op impl of OnFlushSeries. type NoOpColdFlushNamespace struct{} +// CheckpointAndMaybeCompact is a no-op. +func (n *NoOpColdFlushNamespace) CheckpointAndMaybeCompact() error { return nil } + // OnFlushNewSeries is a no-op. func (n *NoOpColdFlushNamespace) OnFlushNewSeries(event OnFlushNewSeriesEvent) error { return nil diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 1cc099d797..d8b0fe440b 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3697,6 +3697,20 @@ func (mr *MockOnColdFlushNamespaceMockRecorder) OnFlushNewSeries(arg0 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFlushNewSeries", reflect.TypeOf((*MockOnColdFlushNamespace)(nil).OnFlushNewSeries), arg0) } +// CheckpointAndMaybeCompact mocks base method +func (m *MockOnColdFlushNamespace) CheckpointAndMaybeCompact() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckpointAndMaybeCompact") + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckpointAndMaybeCompact indicates an expected call of CheckpointAndMaybeCompact +func (mr *MockOnColdFlushNamespaceMockRecorder) CheckpointAndMaybeCompact() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckpointAndMaybeCompact", reflect.TypeOf((*MockOnColdFlushNamespace)(nil).CheckpointAndMaybeCompact)) +} + // Done mocks base method func (m *MockOnColdFlushNamespace) Done() error { m.ctrl.T.Helper()