Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: flush to in-memory sstables #3288

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ const (
compactionKindRead
compactionKindRewrite
compactionKindIngestedFlushable
compactionKindBufferedFlush
)

func (k compactionKind) String() string {
Expand Down Expand Up @@ -659,6 +660,30 @@ type compaction struct {
pickerMetrics compactionPickerMetrics
}

// objectCreator provides the subset of the objstorage.Provider interface
// necessary for compactions and flushes. It's typically satisfied by
// d.objProvider but may be satisfied by bufferedSSTables during flushes.
type objectCreator interface {
// Create creates a new object and opens it for writing.
//
// The object is not guaranteed to be durable (accessible in case of crashes)
// until Sync is called.
Create(
ctx context.Context,
fileType base.FileType,
FileNum base.DiskFileNum,
opts objstorage.CreateOptions,
) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error)
// Remove removes an object.
//
// The object is not guaranteed to be durably removed until Sync is called.
Remove(fileType base.FileType, FileNum base.DiskFileNum) error
// Sync flushes the metadata from creation or removal of objects since the
// last Sync. This includes objects that have been Created but for which
// Writable.Finish() has not yet been called.
Sync() error
}

func (c *compaction) makeInfo(jobID int) CompactionInfo {
info := CompactionInfo{
JobID: jobID,
Expand Down Expand Up @@ -1847,6 +1872,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
var n, inputs int
var inputBytes uint64
var ingest bool
var bufferedFlush = true // TODO(aaditya): loop this into a config setting
for ; n < len(d.mu.mem.queue)-1; n++ {
if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
if n == 0 {
Expand Down Expand Up @@ -1905,6 +1931,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
if err != nil {
return 0, err
}

if bufferedFlush && !ingest {
c.kind = compactionKindBufferedFlush
}
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
Expand All @@ -1917,6 +1947,18 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
})
startTime := d.timeNow()

// Compactions always write directly to the database's object provider.
// Flushes may write to an in-memory object provider first.
var objCreator objectCreator
if c.kind == compactionKindBufferedFlush {
bufferedSSTs := &bufferedSSTables{}
// TODO(aaditya): pick a better size
bufferedSSTs.init(10)
objCreator = bufferedSSTs
} else {
objCreator = d.objProvider
}

var ve *manifest.VersionEdit
var pendingOutputs []physicalMeta
var stats compactStats
Expand All @@ -1927,7 +1969,32 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// runCompaction. For all other flush cases, we construct the VersionEdit
// inside runCompaction.
if c.kind != compactionKindIngestedFlushable {
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c, objCreator)
}

// TODO(aadityas,jackson): If the buffered output sstables are too small,
// avoid linking them into the version and just update the flushable queue
// appropriately.
if c.kind == compactionKindBufferedFlush {
var metas []*fileMetadata
var fileNums []base.DiskFileNum
for _, file := range ve.NewFiles {
metas = append(metas, file.Meta)
fileNums = append(fileNums, file.BackingFileNum)
}

bufferedSST := objCreator.(*bufferedSSTables)
if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ {
var f flushable
f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST)
fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */)
remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2]
mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1]
d.mu.mem.queue = append(remaining, fe, mutable)
return 0, err
}

// else convert to objProvider and write to disk
}

// Acquire logLock. This will be released either on an error, by way of
Expand Down Expand Up @@ -2634,7 +2701,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()

ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err := d.runCompaction(jobID, c, d.objProvider)

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
Expand Down Expand Up @@ -2800,7 +2867,7 @@ func (d *DB) runCopyCompaction(
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
jobID int, c *compaction, objCreator objectCreator,
) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
Expand Down Expand Up @@ -2968,7 +3035,7 @@ func (d *DB) runCompaction(
}
if retErr != nil {
for _, fileNum := range createdFiles {
_ = d.objProvider.Remove(fileTypeTable, fileNum)
_ = objCreator.Remove(fileTypeTable, fileNum)
}
}
for _, closer := range c.closers {
Expand Down Expand Up @@ -3063,7 +3130,8 @@ func (d *DB) runCompaction(
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
}
diskFileNum := base.PhysicalTableDiskFileNum(fileNum)
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)

writable, _, err := objCreator.Create(ctx, fileTypeTable, diskFileNum, createOpts)
if err != nil {
return err
}
Expand All @@ -3075,7 +3143,6 @@ func (d *DB) runCompaction(
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: reason,
Path: d.objProvider.Path(objMeta),
FileNum: diskFileNum,
})
if c.kind != compactionKindFlush {
Expand Down Expand Up @@ -3516,7 +3583,7 @@ func (d *DB) runCompaction(
// compactStats.
stats.countMissizedDels = iter.stats.countMissizedDels

if err := d.objProvider.Sync(); err != nil {
if err := objCreator.Sync(); err != nil {
return nil, pendingOutputs, stats, err
}

Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
// to the user-defined boundaries.
c.maxOutputFileSize = math.MaxUint64

newVE, _, _, err := d.runCompaction(0, c)
newVE, _, _, err := d.runCompaction(0, c, d.objProvider)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ type TableCreateInfo struct {
// Reason is the reason for the table creation: "compacting", "flushing", or
// "ingesting".
Reason string
Path string
FileNum base.DiskFileNum
}

Expand Down
Loading
Loading