Skip to content

Commit

Permalink
[dbnode] Support checkpointing in cold flusher and preparing index wr…
Browse files Browse the repository at this point in the history
…iter per block (#3040)
  • Loading branch information
notbdu authored Dec 23, 2020
1 parent 31a21e3 commit d5122ee
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 150 deletions.
215 changes: 121 additions & 94 deletions src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,106 @@ type dataPersistManager struct {
snapshotID uuid.UUID
}

type indexPersistManager struct {
writer IndexFileSetWriter
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter

type singleUseIndexWriterState struct {
// identifiers required to know which file to open
// after persistence is over
fileSetIdentifier FileSetFileIdentifier
fileSetType persist.FileSetType

// track state of writers
writeErr error
initialized bool
// track state of writer
writeErr error
}

// Support writing to multiple index blocks/filesets during index persist.
// This allows us to prepare an index fileset writer per block start.
type singleUseIndexWriter struct {
// back-ref to the index persist manager so we can share resources there
manager *indexPersistManager
writer IndexFileSetWriter

state singleUseIndexWriterState
}

func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error {
// Lock the index persist manager as we're sharing the segment builder as a resource.
s.manager.Lock()
defer s.manager.Unlock()

markError := func(err error) {
s.state.writeErr = err
}
if err := s.state.writeErr; err != nil {
return fmt.Errorf("encountered error: %w, skipping further attempts to persist data", err)
}

if err := s.manager.segmentWriter.Reset(builder); err != nil {
markError(err)
return err
}

if err := s.writer.WriteSegmentFileSet(s.manager.segmentWriter); err != nil {
markError(err)
return err
}

return nil
}

func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) {
s.manager.Lock()
defer s.manager.Unlock()

// This writer will be thrown away after we're done persisting.
defer func() {
s.state = singleUseIndexWriterState{fileSetType: -1}
s.manager = nil
s.writer = nil
}()

// s.e. we're done writing all segments for PreparedIndexPersist.
// so we can close the writer.
if err := s.writer.Close(); err != nil {
return nil, err
}

// only attempt to retrieve data if we have not encountered errors during
// any writes.
if err := s.state.writeErr; err != nil {
return nil, err
}

// and then we get persistent segments backed by mmap'd data so the index
// can safely evict the segment's we have just persisted.
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{
ReaderOptions: IndexReaderOpenOptions{
Identifier: s.state.fileSetIdentifier,
FileSetType: s.state.fileSetType,
},
FilesystemOptions: s.manager.opts,
newReaderFn: s.manager.newReaderFn,
newPersistentSegmentFn: s.manager.newPersistentSegmentFn,
})
if err != nil {
return nil, err
}

return result.Segments, nil
}

type indexPersistManager struct {
sync.Mutex

// segmentWriter holds the bulk of the re-usable in-mem resources so
// we want to share this across writers.
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter

// hooks used for testing
newReaderFn newIndexReaderFn
newPersistentSegmentFn newPersistentSegmentFn
newIndexWriterFn newIndexWriterFn

// options used by index writers
opts Options
}

type newIndexReaderFn func(Options) (IndexFileSetReader, error)
Expand All @@ -140,6 +224,8 @@ type newPersistentSegmentFn func(
m3ninxfs.Options,
) (m3ninxfs.Segment, error)

type newIndexWriterFn func(Options) (IndexFileSetWriter, error)

type persistManagerMetrics struct {
writeDurationMs tally.Gauge
throttleDurationMs tally.Gauge
Expand All @@ -163,11 +249,6 @@ func NewPersistManager(opts Options) (persist.Manager, error) {
return nil, err
}

idxWriter, err := NewIndexWriter(opts)
if err != nil {
return nil, err
}

segmentWriter, err := m3ninxpersist.NewMutableSegmentFileSetWriter(
opts.FSTWriterOptions())
if err != nil {
Expand All @@ -186,30 +267,31 @@ func NewPersistManager(opts Options) (persist.Manager, error) {
snapshotMetadataWriter: NewSnapshotMetadataWriter(opts),
},
indexPM: indexPersistManager{
writer: idxWriter,
segmentWriter: segmentWriter,
// fs opts are used by underlying index writers
opts: opts,
},
status: persistManagerIdle,
metrics: newPersistManagerMetrics(scope),
}
pm.indexPM.newReaderFn = NewIndexReader
pm.indexPM.newPersistentSegmentFn = m3ninxpersist.NewSegment
pm.indexPM.newIndexWriterFn = NewIndexWriter
pm.runtimeOptsListener = opts.RuntimeOptionsManager().RegisterListener(pm)

return pm, nil
}

func (pm *persistManager) reset() {
func (pm *persistManager) resetWithLock() error {
pm.status = persistManagerIdle
pm.start = timeZero
pm.count = 0
pm.bytesWritten = 0
pm.worked = 0
pm.slept = 0
pm.indexPM.segmentWriter.Reset(nil)
pm.indexPM.writeErr = nil
pm.indexPM.initialized = false
pm.dataPM.snapshotID = nil

return pm.indexPM.segmentWriter.Reset(nil)
}

// StartIndexPersist is called by the databaseFlushManager to begin the persist process for
Expand Down Expand Up @@ -271,83 +353,32 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis
IndexVolumeType: opts.IndexVolumeType,
}

writer, err := pm.indexPM.newIndexWriterFn(pm.opts)
if err != nil {
return prepared, err
}
idxWriter := &singleUseIndexWriter{
manager: &pm.indexPM,
writer: writer,
state: singleUseIndexWriterState{
// track which file we are writing in the persist manager, so we
// know which file to read back on `closeIndex` being called.
fileSetIdentifier: fileSetID,
fileSetType: opts.FileSetType,
},
}
// create writer for required fileset file.
if err := pm.indexPM.writer.Open(idxWriterOpts); err != nil {
if err := idxWriter.writer.Open(idxWriterOpts); err != nil {
return prepared, err
}

// track which file we are writing in the persist manager, so we
// know which file to read back on `closeIndex` being called.
pm.indexPM.fileSetIdentifier = fileSetID
pm.indexPM.fileSetType = opts.FileSetType
pm.indexPM.initialized = true

// provide persistManager hooks into PreparedIndexPersist object
prepared.Persist = pm.persistIndex
prepared.Close = pm.closeIndex
prepared.Persist = idxWriter.persistIndex
prepared.Close = idxWriter.closeIndex

return prepared, nil
}

func (pm *persistManager) persistIndex(builder segment.Builder) error {
// FOLLOWUP(prateek): need to use-rate limiting runtime options in this code path
markError := func(err error) {
pm.indexPM.writeErr = err
}
if err := pm.indexPM.writeErr; err != nil {
return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err)
}

if err := pm.indexPM.segmentWriter.Reset(builder); err != nil {
markError(err)
return err
}

if err := pm.indexPM.writer.WriteSegmentFileSet(pm.indexPM.segmentWriter); err != nil {
markError(err)
return err
}

return nil
}

func (pm *persistManager) closeIndex() ([]segment.Segment, error) {
// ensure StartIndexPersist was called
if !pm.indexPM.initialized {
return nil, errPersistManagerNotPersisting
}
pm.indexPM.initialized = false

// i.e. we're done writing all segments for PreparedIndexPersist.
// so we can close the writer.
if err := pm.indexPM.writer.Close(); err != nil {
return nil, err
}

// only attempt to retrieve data if we have not encountered errors during
// any writes.
if err := pm.indexPM.writeErr; err != nil {
return nil, err
}

// and then we get persistent segments backed by mmap'd data so the index
// can safely evict the segment's we have just persisted.
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{
ReaderOptions: IndexReaderOpenOptions{
Identifier: pm.indexPM.fileSetIdentifier,
FileSetType: pm.indexPM.fileSetType,
},
FilesystemOptions: pm.opts,
newReaderFn: pm.indexPM.newReaderFn,
newPersistentSegmentFn: pm.indexPM.newPersistentSegmentFn,
})
if err != nil {
return nil, err
}

return result.Segments, nil
}

// DoneIndex is called by the databaseFlushManager to finish the index persist process.
func (pm *persistManager) DoneIndex() error {
pm.Lock()
Expand All @@ -362,9 +393,7 @@ func (pm *persistManager) DoneIndex() error {
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond))

// Reset state
pm.reset()

return nil
return pm.resetWithLock()
}

// StartFlushPersist is called by the databaseFlushManager to begin the persist process.
Expand Down Expand Up @@ -558,7 +587,7 @@ func (pm *persistManager) DoneFlush() error {
return errPersistManagerCannotDoneFlushNotFlush
}

return pm.doneShared()
return pm.doneSharedWithLock()
}

// DoneSnapshot is called by the databaseFlushManager to finish the snapshot persist process.
Expand Down Expand Up @@ -594,23 +623,21 @@ func (pm *persistManager) DoneSnapshot(
return fmt.Errorf("error writing out snapshot metadata file: %v", err)
}

return pm.doneShared()
return pm.doneSharedWithLock()
}

// Close all resources.
func (pm *persistManager) Close() {
pm.runtimeOptsListener.Close()
}

func (pm *persistManager) doneShared() error {
func (pm *persistManager) doneSharedWithLock() error {
// Emit timing metrics
pm.metrics.writeDurationMs.Update(float64(pm.worked / time.Millisecond))
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond))

// Reset state
pm.reset()

return nil
return pm.resetWithLock()
}

func (pm *persistManager) dataFilesetExists(prepareOpts persist.DataPrepareOptions) (bool, error) {
Expand Down
Loading

0 comments on commit d5122ee

Please sign in to comment.