Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Ensure index data consistency #2399

Merged
merged 14 commits into from
Jun 19, 2020
54 changes: 35 additions & 19 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions src/dbnode/persist/fs/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *merger) Merge(
flushPreparer persist.FlushPreparer,
nsCtx namespace.Context,
onFlush persist.OnFlushSeries,
) (err error) {
) (persist.DataCloser, error) {
var (
reader = m.reader
blockAllocSize = m.blockAllocSize
Expand All @@ -114,10 +114,12 @@ func (m *merger) Merge(
},
FileSetType: persist.FileSetFlushType,
}
closer persist.DataCloser
err error
)

if err := reader.Open(openOpts); err != nil {
return err
return closer, err
}
defer func() {
// Only set the error here if not set by the end of the function, since
Expand All @@ -129,7 +131,7 @@ func (m *merger) Merge(

nsMd, err := namespace.NewMetadata(nsID, nsOpts)
if err != nil {
return err
return closer, err
}
prepareOpts := persist.DataPrepareOptions{
NamespaceMetadata: nsMd,
Expand All @@ -141,7 +143,7 @@ func (m *merger) Merge(
}
prepared, err := flushPreparer.PrepareData(prepareOpts)
if err != nil {
return err
return closer, err
}

var (
Expand Down Expand Up @@ -191,15 +193,15 @@ func (m *merger) Merge(

// The merge is performed in two stages. The first stage is to loop through
// series on disk and merge it with what's in the merge target. Looping
// through disk in the first stage is done intentionally to read disk
// through disk in the first stage is prepared intentionally to read disk
// sequentially to optimize for spinning disk access. The second stage is to
// persist the rest of the series in the merge target that were not
// persisted in the first stage.

// First stage: loop through series on disk.
for id, tagsIter, data, checksum, err := reader.Read(); err != io.EOF; id, tagsIter, data, checksum, err = reader.Read() {
if err != nil {
return err
return closer, err
}
idsToFinalize = append(idsToFinalize, id)

Expand All @@ -211,7 +213,7 @@ func (m *merger) Merge(
ctx.Reset()
mergeWithData, hasInMemoryData, err := mergeWith.Read(ctx, id, blockStart, nsCtx)
if err != nil {
return err
return closer, err
}
if hasInMemoryData {
segmentReaders = appendBlockReadersToSegmentReaders(segmentReaders, mergeWithData)
Expand All @@ -222,7 +224,7 @@ func (m *merger) Merge(
tags, err := convert.TagsFromTagsIter(id, tagsIter, identPool)
tagsIter.Close()
if err != nil {
return err
return closer, err
}
tagsToFinalize = append(tagsToFinalize, tags)

Expand All @@ -232,15 +234,15 @@ func (m *merger) Merge(
if len(segmentReaders) == 1 && hasInMemoryData == false {
segment, err := segmentReaders[0].Segment()
if err != nil {
return err
return closer, err
}

if err := persistSegmentWithChecksum(id, tags, segment, checksum, prepared.Persist); err != nil {
return err
return closer, err
}
} else {
if err := persistSegmentReaders(id, tags, segmentReaders, iterResources, prepared.Persist); err != nil {
return err
return closer, err
}
}
// Closing the context will finalize the data returned from
Expand Down Expand Up @@ -278,12 +280,11 @@ func (m *merger) Merge(
return err
}, nsCtx)
if err != nil {
return err
return closer, err
}

// Close the flush preparer, which writes the rest of the files in the
// fileset.
return prepared.Close()
// NB(bodu): Return a deferred closer so that we can guarantee that cold index writes are persisted first.
return prepared.DeferClose()
}

func appendBlockReadersToSegmentReaders(segReaders []xio.SegmentReader, brs []xio.BlockReader) []xio.SegmentReader {
Expand Down
14 changes: 12 additions & 2 deletions src/dbnode/persist/fs/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func testMergeWith(
reader := mockReaderFromData(ctrl, diskData)

var persisted []persistedData
var deferClosed bool
preparer := persist.NewMockFlushPreparer(ctrl)
preparer.EXPECT().PrepareData(gomock.Any()).Return(
persist.PreparedDataPersist{
Expand All @@ -450,7 +451,13 @@ func testMergeWith(
})
return nil
},
Close: func() error { return nil },
DeferClose: func() (persist.DataCloser, error) {
return func() error {
require.False(t, deferClosed)
deferClosed = true
robskillington marked this conversation as resolved.
Show resolved Hide resolved
return nil
}, nil
},
}, nil)
nsCtx := namespace.Context{}

Expand All @@ -463,8 +470,11 @@ func testMergeWith(
BlockStart: startTime,
}
mergeWith := mockMergeWithFromData(t, ctrl, diskData, mergeTargetData)
err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx, &persist.NoOpColdFlushNamespace{})
close, err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx, &persist.NoOpColdFlushNamespace{})
require.NoError(t, err)
require.False(t, deferClosed)
require.NoError(t, close())
robskillington marked this conversation as resolved.
Show resolved Hide resolved
require.True(t, deferClosed)

assertPersistedAsExpected(t, persisted, expectedData)
}
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) {

func (pm *persistManager) reset() {
pm.status = persistManagerIdle
pm.start = timeZero
pm.start = timeZero
pm.count = 0
pm.bytesWritten = 0
pm.worked = 0
Expand Down Expand Up @@ -490,6 +490,7 @@ func (pm *persistManager) PrepareData(opts persist.DataPrepareOptions) (persist.

prepared.Persist = pm.persist
prepared.Close = pm.closeData
prepared.DeferClose = pm.deferCloseData

return prepared, nil
}
Expand Down Expand Up @@ -544,6 +545,10 @@ func (pm *persistManager) closeData() error {
return pm.dataPM.writer.Close()
}

func (pm *persistManager) deferCloseData() (persist.DataCloser, error) {
return pm.dataPM.writer.DeferClose()
}

// DoneFlush is called by the databaseFlushManager to finish the data persist process.
func (pm *persistManager) DoneFlush() error {
pm.Lock()
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type DataFileSetWriter interface {
// WriteAll will write the id and all byte slices and returns an error on a write error.
// Callers must not call this method with a given ID more than once.
WriteAll(id ident.ID, tags ident.Tags, data []checked.Bytes, checksum uint32) error

// DeferClose returns a DataCloser that defers writing of a checkpoint file.
DeferClose() (persist.DataCloser, error)
}

// SnapshotMetadataFileWriter writes out snapshot metadata files.
Expand Down Expand Up @@ -557,7 +560,7 @@ type Merger interface {
flushPreparer persist.FlushPreparer,
nsCtx namespace.Context,
onFlush persist.OnFlushSeries,
) error
) (persist.DataCloser, error)
}

// NewMergerFn is the function to call to get a new Merger.
Expand Down
Loading