From 6d8b1e821ecc77e87a67f0ec041bb7fa98b00b23 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 22 Dec 2020 16:29:45 -0500 Subject: [PATCH 1/6] Add checkpointing method to cold flusher. --- src/dbnode/persist/types.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index 4b4b195b53..25a8458f2d 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -366,13 +366,21 @@ type OnFlushNewSeriesEvent struct { } // OnFlushSeries performs work on a per series level. +// Also exposes a checkpoint fn for maybe compacting multiple index segments based on size. type OnFlushSeries interface { OnFlushNewSeries(OnFlushNewSeriesEvent) error + + // CheckpointAndMaybeCompact checks to see if we're at maximum cardinality + // for any index segments we're currently building and compact if we are. + CheckpointAndMaybeCompact() error } // NoOpColdFlushNamespace is a no-op impl of OnFlushSeries. type NoOpColdFlushNamespace struct{} +// CheckpointAndMaybeCompact is a no-op. +func (n *NoOpColdFlushNamespace) CheckpointAndMaybeCompact() error { return nil } + // OnFlushNewSeries is a no-op. func (n *NoOpColdFlushNamespace) OnFlushNewSeries(event OnFlushNewSeriesEvent) error { return nil From c2b1a92e3eb83c25c9f53c79a83eb5d05917e0ee Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 22 Dec 2020 16:30:15 -0500 Subject: [PATCH 2/6] Update mocks. --- src/dbnode/persist/persist_mock.go | 14 ++++++++++++++ src/dbnode/storage/storage_mock.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/dbnode/persist/persist_mock.go b/src/dbnode/persist/persist_mock.go index 1ec52dcd02..351250cf06 100644 --- a/src/dbnode/persist/persist_mock.go +++ b/src/dbnode/persist/persist_mock.go @@ -341,3 +341,17 @@ func (mr *MockOnFlushSeriesMockRecorder) OnFlushNewSeries(arg0 interface{}) *gom mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFlushNewSeries", reflect.TypeOf((*MockOnFlushSeries)(nil).OnFlushNewSeries), arg0) } + +// CheckpointAndMaybeCompact mocks base method +func (m *MockOnFlushSeries) CheckpointAndMaybeCompact() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckpointAndMaybeCompact") + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckpointAndMaybeCompact indicates an expected call of CheckpointAndMaybeCompact +func (mr *MockOnFlushSeriesMockRecorder) CheckpointAndMaybeCompact() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckpointAndMaybeCompact", reflect.TypeOf((*MockOnFlushSeries)(nil).CheckpointAndMaybeCompact)) +} diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 1cc099d797..d8b0fe440b 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3697,6 +3697,20 @@ func (mr *MockOnColdFlushNamespaceMockRecorder) OnFlushNewSeries(arg0 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFlushNewSeries", reflect.TypeOf((*MockOnColdFlushNamespace)(nil).OnFlushNewSeries), arg0) } +// CheckpointAndMaybeCompact mocks base method +func (m *MockOnColdFlushNamespace) CheckpointAndMaybeCompact() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckpointAndMaybeCompact") + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckpointAndMaybeCompact indicates an expected call of CheckpointAndMaybeCompact +func (mr *MockOnColdFlushNamespaceMockRecorder) CheckpointAndMaybeCompact() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckpointAndMaybeCompact", reflect.TypeOf((*MockOnColdFlushNamespace)(nil).CheckpointAndMaybeCompact)) +} + // Done mocks base method func (m *MockOnColdFlushNamespace) Done() error { m.ctrl.T.Helper() From ea6977d49477011ef7e895c3fc893a12c817c5cf Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 22 Dec 2020 16:30:42 -0500 Subject: [PATCH 3/6] Support preparing an index writer per block in persist manager. --- src/dbnode/persist/fs/persist_manager.go | 225 +++++++++++------- src/dbnode/persist/fs/persist_manager_test.go | 110 +++++---- 2 files changed, 192 insertions(+), 143 deletions(-) diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 1051301a69..efb13dbf27 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" xresource "github.com/m3db/m3/src/x/resource" + xtime "github.com/m3db/m3/src/x/time" "github.com/pborman/uuid" "github.com/uber-go/tally" @@ -63,6 +64,7 @@ var ( errPersistManagerFileSetAlreadyExists = errors.New("persist manager cannot prepare, fileset already exists") errPersistManagerCannotDoneSnapshotNotSnapshot = errors.New("persist manager cannot done snapshot, file set type is not snapshot") errPersistManagerCannotDoneFlushNotFlush = errors.New("persist manager cannot done flush, file set type is not flush") + errPersistManagerIndexWriterAlreadyExists = errors.New("persist manager cannot create index writer, already exists") ) type sleepFn func(time.Duration) @@ -115,22 +117,100 @@ type dataPersistManager struct { snapshotID uuid.UUID } -type indexPersistManager struct { - writer IndexFileSetWriter - segmentWriter m3ninxpersist.MutableSegmentFileSetWriter +// Support writing to multiple index blocks/filesets during index persist. +// This allows us to prepare an index fileset writer per block start. +type singleUseIndexWriter struct { + // back-ref to the index persist manager so we can share resources there + manager *indexPersistManager + writer IndexFileSetWriter // identifiers required to know which file to open // after persistence is over fileSetIdentifier FileSetFileIdentifier fileSetType persist.FileSetType - // track state of writers - writeErr error - initialized bool + // track state of writer + writeErr error +} + +func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { + // Lock the index persist manager as we're sharing the segment builder as a resource. + s.manager.Lock() + defer s.manager.Unlock() + + markError := func(err error) { + s.writeErr = err + } + if err := s.writeErr; err != nil { + return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err) + } + + if err := s.manager.segmentWriter.Reset(builder); err != nil { + markError(err) + return err + } + + if err := s.writer.WriteSegmentFileSet(s.manager.segmentWriter); err != nil { + markError(err) + return err + } + + return nil +} + +func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { + // This writer will be thrown away after we're done persisting. + defer func() { + s.writeErr = nil + s.fileSetType = -1 + s.fileSetIdentifier = FileSetFileIdentifier{} + }() + + // s.e. we're done writing all segments for PreparedIndexPersist. + // so we can close the writer. + if err := s.writer.Close(); err != nil { + return nil, err + } + + // only attempt to retrieve data if we have not encountered errors during + // any writes. + if err := s.writeErr; err != nil { + return nil, err + } + + // and then we get persistent segments backed by mmap'd data so the index + // can safely evict the segment's we have just persisted. + result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ + ReaderOptions: IndexReaderOpenOptions{ + Identifier: s.fileSetIdentifier, + FileSetType: s.fileSetType, + }, + FilesystemOptions: s.manager.opts, + newReaderFn: s.manager.newReaderFn, + newPersistentSegmentFn: s.manager.newPersistentSegmentFn, + }) + if err != nil { + return nil, err + } + + return result.Segments, nil +} + +type indexPersistManager struct { + sync.Mutex + + writersByBlockStart map[xtime.UnixNano]*singleUseIndexWriter + // segmentWriter holds the bulk of the re-usable in-mem resources so + // we want to share this across writers. + segmentWriter m3ninxpersist.MutableSegmentFileSetWriter // hooks used for testing newReaderFn newIndexReaderFn newPersistentSegmentFn newPersistentSegmentFn + newIndexWriterFn newIndexWriterFn + + // options used by index writers + opts Options } type newIndexReaderFn func(Options) (IndexFileSetReader, error) @@ -140,6 +220,8 @@ type newPersistentSegmentFn func( m3ninxfs.Options, ) (m3ninxfs.Segment, error) +type newIndexWriterFn func(Options) (IndexFileSetWriter, error) + type persistManagerMetrics struct { writeDurationMs tally.Gauge throttleDurationMs tally.Gauge @@ -163,11 +245,6 @@ func NewPersistManager(opts Options) (persist.Manager, error) { return nil, err } - idxWriter, err := NewIndexWriter(opts) - if err != nil { - return nil, err - } - segmentWriter, err := m3ninxpersist.NewMutableSegmentFileSetWriter( opts.FSTWriterOptions()) if err != nil { @@ -186,30 +263,35 @@ func NewPersistManager(opts Options) (persist.Manager, error) { snapshotMetadataWriter: NewSnapshotMetadataWriter(opts), }, indexPM: indexPersistManager{ - writer: idxWriter, - segmentWriter: segmentWriter, + writersByBlockStart: make(map[xtime.UnixNano]*singleUseIndexWriter), + segmentWriter: segmentWriter, + // fs opts are used by underlying index writers + opts: opts, }, status: persistManagerIdle, metrics: newPersistManagerMetrics(scope), } pm.indexPM.newReaderFn = NewIndexReader pm.indexPM.newPersistentSegmentFn = m3ninxpersist.NewSegment + pm.indexPM.newIndexWriterFn = NewIndexWriter pm.runtimeOptsListener = opts.RuntimeOptionsManager().RegisterListener(pm) return pm, nil } -func (pm *persistManager) reset() { +func (pm *persistManager) resetWithLock() { pm.status = persistManagerIdle pm.start = timeZero pm.count = 0 pm.bytesWritten = 0 pm.worked = 0 pm.slept = 0 - pm.indexPM.segmentWriter.Reset(nil) - pm.indexPM.writeErr = nil - pm.indexPM.initialized = false pm.dataPM.snapshotID = nil + + pm.indexPM.segmentWriter.Reset(nil) + for blockStart := range pm.indexPM.writersByBlockStart { + delete(pm.indexPM.writersByBlockStart, blockStart) + } } // StartIndexPersist is called by the databaseFlushManager to begin the persist process for @@ -271,83 +353,27 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis IndexVolumeType: opts.IndexVolumeType, } + idxWriter, err := pm.ensureSingleIndexWriterForBlock(blockStart) + if err != nil { + return prepared, err + } // create writer for required fileset file. - if err := pm.indexPM.writer.Open(idxWriterOpts); err != nil { + if err := idxWriter.writer.Open(idxWriterOpts); err != nil { return prepared, err } // track which file we are writing in the persist manager, so we // know which file to read back on `closeIndex` being called. - pm.indexPM.fileSetIdentifier = fileSetID - pm.indexPM.fileSetType = opts.FileSetType - pm.indexPM.initialized = true + idxWriter.fileSetIdentifier = fileSetID + idxWriter.fileSetType = opts.FileSetType // provide persistManager hooks into PreparedIndexPersist object - prepared.Persist = pm.persistIndex - prepared.Close = pm.closeIndex + prepared.Persist = idxWriter.persistIndex + prepared.Close = idxWriter.closeIndex return prepared, nil } -func (pm *persistManager) persistIndex(builder segment.Builder) error { - // FOLLOWUP(prateek): need to use-rate limiting runtime options in this code path - markError := func(err error) { - pm.indexPM.writeErr = err - } - if err := pm.indexPM.writeErr; err != nil { - return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err) - } - - if err := pm.indexPM.segmentWriter.Reset(builder); err != nil { - markError(err) - return err - } - - if err := pm.indexPM.writer.WriteSegmentFileSet(pm.indexPM.segmentWriter); err != nil { - markError(err) - return err - } - - return nil -} - -func (pm *persistManager) closeIndex() ([]segment.Segment, error) { - // ensure StartIndexPersist was called - if !pm.indexPM.initialized { - return nil, errPersistManagerNotPersisting - } - pm.indexPM.initialized = false - - // i.e. we're done writing all segments for PreparedIndexPersist. - // so we can close the writer. - if err := pm.indexPM.writer.Close(); err != nil { - return nil, err - } - - // only attempt to retrieve data if we have not encountered errors during - // any writes. - if err := pm.indexPM.writeErr; err != nil { - return nil, err - } - - // and then we get persistent segments backed by mmap'd data so the index - // can safely evict the segment's we have just persisted. - result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ - ReaderOptions: IndexReaderOpenOptions{ - Identifier: pm.indexPM.fileSetIdentifier, - FileSetType: pm.indexPM.fileSetType, - }, - FilesystemOptions: pm.opts, - newReaderFn: pm.indexPM.newReaderFn, - newPersistentSegmentFn: pm.indexPM.newPersistentSegmentFn, - }) - if err != nil { - return nil, err - } - - return result.Segments, nil -} - // DoneIndex is called by the databaseFlushManager to finish the index persist process. func (pm *persistManager) DoneIndex() error { pm.Lock() @@ -362,7 +388,7 @@ func (pm *persistManager) DoneIndex() error { pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) // Reset state - pm.reset() + pm.resetWithLock() return nil } @@ -558,7 +584,7 @@ func (pm *persistManager) DoneFlush() error { return errPersistManagerCannotDoneFlushNotFlush } - return pm.doneShared() + return pm.doneSharedWithLock() } // DoneSnapshot is called by the databaseFlushManager to finish the snapshot persist process. @@ -594,7 +620,7 @@ func (pm *persistManager) DoneSnapshot( return fmt.Errorf("error writing out snapshot metadata file: %v", err) } - return pm.doneShared() + return pm.doneSharedWithLock() } // Close all resources. @@ -602,13 +628,13 @@ func (pm *persistManager) Close() { pm.runtimeOptsListener.Close() } -func (pm *persistManager) doneShared() error { +func (pm *persistManager) doneSharedWithLock() error { // Emit timing metrics pm.metrics.writeDurationMs.Update(float64(pm.worked / time.Millisecond)) pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) // Reset state - pm.reset() + pm.resetWithLock() return nil } @@ -644,3 +670,28 @@ func (pm *persistManager) SetRuntimeOptions(value runtime.Options) { pm.currRateLimitOpts = value.PersistRateLimitOptions() pm.Unlock() } + +// Ensure only a single index writer is ever created for a block, returns an error if an index writer +// already exists. +func (pm *persistManager) ensureSingleIndexWriterForBlock(blockStart time.Time) (*singleUseIndexWriter, error) { + pm.Lock() + defer pm.Unlock() + + idxWriter, ok := pm.indexPM.writersByBlockStart[xtime.ToUnixNano(blockStart)] + // Ensure that we have not already created an index writer for this block. + if ok { + return nil, errPersistManagerIndexWriterAlreadyExists + } + + writer, err := pm.indexPM.newIndexWriterFn(pm.opts) + if err != nil { + return nil, err + } + idxWriter = &singleUseIndexWriter{ + manager: &pm.indexPM, + writer: writer, + } + pm.indexPM.writersByBlockStart[xtime.ToUnixNano(blockStart)] = idxWriter + + return idxWriter, nil +} diff --git a/src/dbnode/persist/fs/persist_manager_test.go b/src/dbnode/persist/fs/persist_manager_test.go index cd0f39386d..362db4b5eb 100644 --- a/src/dbnode/persist/fs/persist_manager_test.go +++ b/src/dbnode/persist/fs/persist_manager_test.go @@ -313,15 +313,6 @@ func TestPersistenceManagerCloseData(t *testing.T) { pm.closeData() } -func TestPersistenceManagerCloseIndex(t *testing.T) { - ctrl := gomock.NewController(xtest.Reporter{T: t}) - defer ctrl.Finish() - - pm, _, _, _ := testIndexPersistManager(t, ctrl) - defer os.RemoveAll(pm.filePathPrefix) - pm.closeIndex() -} - func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() @@ -421,17 +412,6 @@ func TestPersistenceManagerPrepareIndexSuccess(t *testing.T) { pm, writer, segWriter, _ := testIndexPersistManager(t, ctrl) defer os.RemoveAll(pm.filePathPrefix) - blockStart := time.Unix(1000, 0) - writerOpts := IndexWriterOpenOptions{ - Identifier: FileSetFileIdentifier{ - FileSetContentType: persist.FileSetIndexContentType, - Namespace: testNs1ID, - BlockStart: blockStart, - }, - BlockSize: testBlockSize, - } - writer.EXPECT().Open(xtest.CmpMatcher(writerOpts, m3test.IdentTransformer)).Return(nil) - flush, err := pm.StartIndexPersist() require.NoError(t, err) @@ -440,46 +420,62 @@ func TestPersistenceManagerPrepareIndexSuccess(t *testing.T) { assert.NoError(t, flush.DoneIndex()) }() - prepareOpts := persist.IndexPrepareOptions{ - NamespaceMetadata: testNs1Metadata(t), - BlockStart: blockStart, - } - prepared, err := flush.PrepareIndex(prepareOpts) - require.NoError(t, err) + // We support preparing multiple index block writers for an index persist. + numBlocks := 10 + blockStart := time.Unix(1000, 0) + for i := 1; i < numBlocks; i++ { + blockStart = blockStart.Add(time.Duration(i) * testBlockSize) + writerOpts := IndexWriterOpenOptions{ + Identifier: FileSetFileIdentifier{ + FileSetContentType: persist.FileSetIndexContentType, + Namespace: testNs1ID, + BlockStart: blockStart, + }, + BlockSize: testBlockSize, + } + writer.EXPECT().Open(xtest.CmpMatcher(writerOpts, m3test.IdentTransformer)).Return(nil) - seg := segment.NewMockMutableSegment(ctrl) - segWriter.EXPECT().Reset(seg).Return(nil) - writer.EXPECT().WriteSegmentFileSet(segWriter).Return(nil) - require.NoError(t, prepared.Persist(seg)) + prepareOpts := persist.IndexPrepareOptions{ + NamespaceMetadata: testNs1Metadata(t), + BlockStart: blockStart, + } + prepared, err := flush.PrepareIndex(prepareOpts) + require.NoError(t, err) - reader := NewMockIndexFileSetReader(ctrl) - pm.indexPM.newReaderFn = func(Options) (IndexFileSetReader, error) { - return reader, nil - } + seg := segment.NewMockMutableSegment(ctrl) + segWriter.EXPECT().Reset(seg).Return(nil) + writer.EXPECT().WriteSegmentFileSet(segWriter).Return(nil) + require.NoError(t, prepared.Persist(seg)) - reader.EXPECT().Open(xtest.CmpMatcher(IndexReaderOpenOptions{ - Identifier: writerOpts.Identifier, - }, m3test.IdentTransformer)).Return(IndexReaderOpenResult{}, nil) + reader := NewMockIndexFileSetReader(ctrl) + pm.indexPM.newReaderFn = func(Options) (IndexFileSetReader, error) { + return reader, nil + } - file := NewMockIndexSegmentFile(ctrl) - gomock.InOrder( - reader.EXPECT().SegmentFileSets().Return(1), - reader.EXPECT().ReadSegmentFileSet().Return(file, nil), - reader.EXPECT().ReadSegmentFileSet().Return(nil, io.EOF), - ) - fsSeg := m3ninxfs.NewMockSegment(ctrl) - pm.indexPM.newPersistentSegmentFn = func( - fset m3ninxpersist.IndexSegmentFileSet, opts m3ninxfs.Options, - ) (m3ninxfs.Segment, error) { - require.Equal(t, file, fset) - return fsSeg, nil - } + reader.EXPECT().Open(xtest.CmpMatcher(IndexReaderOpenOptions{ + Identifier: writerOpts.Identifier, + }, m3test.IdentTransformer)).Return(IndexReaderOpenResult{}, nil) + + file := NewMockIndexSegmentFile(ctrl) + gomock.InOrder( + reader.EXPECT().SegmentFileSets().Return(1), + reader.EXPECT().ReadSegmentFileSet().Return(file, nil), + reader.EXPECT().ReadSegmentFileSet().Return(nil, io.EOF), + ) + fsSeg := m3ninxfs.NewMockSegment(ctrl) + pm.indexPM.newPersistentSegmentFn = func( + fset m3ninxpersist.IndexSegmentFileSet, opts m3ninxfs.Options, + ) (m3ninxfs.Segment, error) { + require.Equal(t, file, fset) + return fsSeg, nil + } - writer.EXPECT().Close().Return(nil) - segs, err := prepared.Close() - require.NoError(t, err) - require.Len(t, segs, 1) - require.Equal(t, fsSeg, segs[0]) + writer.EXPECT().Close().Return(nil) + segs, err := prepared.Close() + require.NoError(t, err) + require.Len(t, segs, 1) + require.Equal(t, fsSeg, segs[0]) + } } func TestPersistenceManagerNoRateLimit(t *testing.T) { @@ -766,7 +762,9 @@ func testIndexPersistManager(t *testing.T, ctrl *gomock.Controller, require.NoError(t, err) manager := mgr.(*persistManager) - manager.indexPM.writer = writer + manager.indexPM.newIndexWriterFn = func(opts Options) (IndexFileSetWriter, error) { + return writer, nil + } manager.indexPM.segmentWriter = segmentWriter return manager, writer, segmentWriter, opts } From 9740105f6a1f65517d3d4f7a5d751b0c0e319f5f Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 23 Dec 2020 13:07:30 -0500 Subject: [PATCH 4/6] Address PR feedback. --- src/dbnode/persist/fs/persist_manager.go | 64 +++++++++--------------- 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index efb13dbf27..7a163da9a2 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -64,7 +64,6 @@ var ( errPersistManagerFileSetAlreadyExists = errors.New("persist manager cannot prepare, fileset already exists") errPersistManagerCannotDoneSnapshotNotSnapshot = errors.New("persist manager cannot done snapshot, file set type is not snapshot") errPersistManagerCannotDoneFlushNotFlush = errors.New("persist manager cannot done flush, file set type is not flush") - errPersistManagerIndexWriterAlreadyExists = errors.New("persist manager cannot create index writer, already exists") ) type sleepFn func(time.Duration) @@ -124,6 +123,10 @@ type singleUseIndexWriter struct { manager *indexPersistManager writer IndexFileSetWriter + state singleUseIndexWriterState +} + +type singleUseIndexWriterState struct { // identifiers required to know which file to open // after persistence is over fileSetIdentifier FileSetFileIdentifier @@ -139,9 +142,9 @@ func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { defer s.manager.Unlock() markError := func(err error) { - s.writeErr = err + s.state.writeErr = err } - if err := s.writeErr; err != nil { + if err := s.state.writeErr; err != nil { return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err) } @@ -159,11 +162,12 @@ func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { } func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { + s.manager.Lock() + defer s.manager.Unlock() + // This writer will be thrown away after we're done persisting. defer func() { - s.writeErr = nil - s.fileSetType = -1 - s.fileSetIdentifier = FileSetFileIdentifier{} + s.state = singleUseIndexWriterState{fileSetType: -1} }() // s.e. we're done writing all segments for PreparedIndexPersist. @@ -174,7 +178,7 @@ func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { // only attempt to retrieve data if we have not encountered errors during // any writes. - if err := s.writeErr; err != nil { + if err := s.state.writeErr; err != nil { return nil, err } @@ -182,8 +186,8 @@ func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { // can safely evict the segment's we have just persisted. result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ ReaderOptions: IndexReaderOpenOptions{ - Identifier: s.fileSetIdentifier, - FileSetType: s.fileSetType, + Identifier: s.state.fileSetIdentifier, + FileSetType: s.state.fileSetType, }, FilesystemOptions: s.manager.opts, newReaderFn: s.manager.newReaderFn, @@ -353,20 +357,25 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis IndexVolumeType: opts.IndexVolumeType, } - idxWriter, err := pm.ensureSingleIndexWriterForBlock(blockStart) + writer, err := pm.indexPM.newIndexWriterFn(pm.opts) if err != nil { return prepared, err } + idxWriter := &singleUseIndexWriter{ + manager: &pm.indexPM, + writer: writer, + state: singleUseIndexWriterState{ + // track which file we are writing in the persist manager, so we + // know which file to read back on `closeIndex` being called. + fileSetIdentifier: fileSetID, + fileSetType: opts.FileSetType, + }, + } // create writer for required fileset file. if err := idxWriter.writer.Open(idxWriterOpts); err != nil { return prepared, err } - // track which file we are writing in the persist manager, so we - // know which file to read back on `closeIndex` being called. - idxWriter.fileSetIdentifier = fileSetID - idxWriter.fileSetType = opts.FileSetType - // provide persistManager hooks into PreparedIndexPersist object prepared.Persist = idxWriter.persistIndex prepared.Close = idxWriter.closeIndex @@ -670,28 +679,3 @@ func (pm *persistManager) SetRuntimeOptions(value runtime.Options) { pm.currRateLimitOpts = value.PersistRateLimitOptions() pm.Unlock() } - -// Ensure only a single index writer is ever created for a block, returns an error if an index writer -// already exists. -func (pm *persistManager) ensureSingleIndexWriterForBlock(blockStart time.Time) (*singleUseIndexWriter, error) { - pm.Lock() - defer pm.Unlock() - - idxWriter, ok := pm.indexPM.writersByBlockStart[xtime.ToUnixNano(blockStart)] - // Ensure that we have not already created an index writer for this block. - if ok { - return nil, errPersistManagerIndexWriterAlreadyExists - } - - writer, err := pm.indexPM.newIndexWriterFn(pm.opts) - if err != nil { - return nil, err - } - idxWriter = &singleUseIndexWriter{ - manager: &pm.indexPM, - writer: writer, - } - pm.indexPM.writersByBlockStart[xtime.ToUnixNano(blockStart)] = idxWriter - - return idxWriter, nil -} From 72e229771ed347e9799c7f3be07f71c71be2e575 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 23 Dec 2020 13:10:05 -0500 Subject: [PATCH 5/6] Fix lint. --- src/dbnode/persist/fs/persist_manager.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 7a163da9a2..883ae02f63 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -37,7 +37,6 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" xresource "github.com/m3db/m3/src/x/resource" - xtime "github.com/m3db/m3/src/x/time" "github.com/pborman/uuid" "github.com/uber-go/tally" @@ -145,7 +144,7 @@ func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { s.state.writeErr = err } if err := s.state.writeErr; err != nil { - return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err) + return fmt.Errorf("encountered error: %w, skipping further attempts to persist data", err) } if err := s.manager.segmentWriter.Reset(builder); err != nil { @@ -203,7 +202,6 @@ func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { type indexPersistManager struct { sync.Mutex - writersByBlockStart map[xtime.UnixNano]*singleUseIndexWriter // segmentWriter holds the bulk of the re-usable in-mem resources so // we want to share this across writers. segmentWriter m3ninxpersist.MutableSegmentFileSetWriter @@ -267,8 +265,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) { snapshotMetadataWriter: NewSnapshotMetadataWriter(opts), }, indexPM: indexPersistManager{ - writersByBlockStart: make(map[xtime.UnixNano]*singleUseIndexWriter), - segmentWriter: segmentWriter, + segmentWriter: segmentWriter, // fs opts are used by underlying index writers opts: opts, }, @@ -283,7 +280,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) { return pm, nil } -func (pm *persistManager) resetWithLock() { +func (pm *persistManager) resetWithLock() error { pm.status = persistManagerIdle pm.start = timeZero pm.count = 0 @@ -292,10 +289,7 @@ func (pm *persistManager) resetWithLock() { pm.slept = 0 pm.dataPM.snapshotID = nil - pm.indexPM.segmentWriter.Reset(nil) - for blockStart := range pm.indexPM.writersByBlockStart { - delete(pm.indexPM.writersByBlockStart, blockStart) - } + return pm.indexPM.segmentWriter.Reset(nil) } // StartIndexPersist is called by the databaseFlushManager to begin the persist process for @@ -397,9 +391,7 @@ func (pm *persistManager) DoneIndex() error { pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) // Reset state - pm.resetWithLock() - - return nil + return pm.resetWithLock() } // StartFlushPersist is called by the databaseFlushManager to begin the persist process. @@ -643,9 +635,7 @@ func (pm *persistManager) doneSharedWithLock() error { pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) // Reset state - pm.resetWithLock() - - return nil + return pm.resetWithLock() } func (pm *persistManager) dataFilesetExists(prepareOpts persist.DataPrepareOptions) (bool, error) { From 750e7deebb1c84cdd88032d26a700810bb963715 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 23 Dec 2020 15:42:18 -0500 Subject: [PATCH 6/6] Remove refs after closing index writer. --- src/dbnode/persist/fs/persist_manager.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 883ae02f63..9b824e7c79 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -115,16 +115,6 @@ type dataPersistManager struct { snapshotID uuid.UUID } -// Support writing to multiple index blocks/filesets during index persist. -// This allows us to prepare an index fileset writer per block start. -type singleUseIndexWriter struct { - // back-ref to the index persist manager so we can share resources there - manager *indexPersistManager - writer IndexFileSetWriter - - state singleUseIndexWriterState -} - type singleUseIndexWriterState struct { // identifiers required to know which file to open // after persistence is over @@ -135,6 +125,16 @@ type singleUseIndexWriterState struct { writeErr error } +// Support writing to multiple index blocks/filesets during index persist. +// This allows us to prepare an index fileset writer per block start. +type singleUseIndexWriter struct { + // back-ref to the index persist manager so we can share resources there + manager *indexPersistManager + writer IndexFileSetWriter + + state singleUseIndexWriterState +} + func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { // Lock the index persist manager as we're sharing the segment builder as a resource. s.manager.Lock() @@ -167,6 +167,8 @@ func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { // This writer will be thrown away after we're done persisting. defer func() { s.state = singleUseIndexWriterState{fileSetType: -1} + s.manager = nil + s.writer = nil }() // s.e. we're done writing all segments for PreparedIndexPersist.