Skip to content

Commit

Permalink
vfs: add write category during file creation
Browse files Browse the repository at this point in the history
This commit modifies the vfs file creation interface to
support grouping disk write metrics by the write source.

Informs cockroachdb/cockroach#115434
  • Loading branch information
CheranMahalingam committed Apr 8, 2024
1 parent 45b7a80 commit 278b6a6
Show file tree
Hide file tree
Showing 66 changed files with 316 additions and 228 deletions.
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (d *DB) writeCheckpointManifest(
}
defer src.Close()

dst, err := fs.Create(destPath)
dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestCleaner(t *testing.T) {
if len(td.CmdArgs) != 1 {
return "create-bogus-file <db/file>"
}
dst, err := fs.Create(td.CmdArgs[0].String())
dst, err := fs.Create(td.CmdArgs[0].String(), vfs.WriteCategoryUnspecified)
require.NoError(t, err)
_, err = dst.Write([]byte("bogus data"))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pebble/fsbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type fsBench struct {
// createFile can be used to create an empty file.
// Invariant: File shouldn't already exist.
func createFile(filepath string) vfs.File {
fh, err := fsConfig.fs.Create(filepath)
fh, err := fsConfig.fs.Create(filepath, vfs.WriteCategoryUnspecified)
if err != nil {
log.Fatalln(err)
}
Expand Down
2 changes: 1 addition & 1 deletion commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
// rotate and close the log.

mem := vfs.NewMem()
f, err := mem.Create("test-wal")
f, err := mem.Create("test-wal", vfs.WriteCategoryUnspecified)
require.NoError(t, err)

// syncDelayFile will block on the done channel befor returning from Sync
Expand Down
16 changes: 16 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2897,9 +2897,25 @@ func (d *DB) runCompaction(
ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
}
}
var writeCategory vfs.DiskWriteCategory
switch c.kind {
case compactionKindFlush:
if d.opts.EnableSQLRowSpillMetrics {
// In the scenario that the Pebble engine is used for SQL row spills the data written to
// the memtable will correspond to spills to disk and should be categorized as such.
writeCategory = "sql-row-spill"
} else {
writeCategory = "pebble-memtable-flush"
}
case compactionKindIngestedFlushable:
writeCategory = "pebble-ingestion"
default:
writeCategory = "pebble-compaction"
}
// Prefer shared storage if present.
createOpts := objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
WriteCategory: writeCategory,
}
diskFileNum := base.PhysicalTableDiskFileNum(fileNum)
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
Expand Down
6 changes: 3 additions & 3 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2752,7 +2752,7 @@ func TestCompactionErrorCleanup(t *testing.T) {

ingest := func(keys ...string) {
t.Helper()
f, err := mem.Create("ext")
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
require.NoError(t, err)

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
Expand Down Expand Up @@ -3477,7 +3477,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
ingestKeys := func(db *DB) error {
// Create an SST for ingestion.
const fName = "ext"
f, err := db.opts.FS.Create(fName)
f, err := db.opts.FS.Create(fName, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
require.NoError(t, w.Set(key, nil))
Expand Down Expand Up @@ -3682,7 +3682,7 @@ func TestCompactionErrorStats(t *testing.T) {

ingest := func(keys ...string) {
t.Helper()
f, err := mem.Create("ext")
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
require.NoError(t, err)

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {

writeOpts := d.opts.MakeWriterOptions(0 /* level */, tableFormat)

f, err := fs.Create(path)
f, err := fs.Create(path, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
Expand Down
20 changes: 11 additions & 9 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ func TestDBConcurrentCompactClose(t *testing.T) {
// causing compactions to be running concurrently with the close below.
for j := 0; j < 10; j++ {
path := fmt.Sprintf("ext%d", j)
f, err := mem.Create(path)
f, err := mem.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -1617,7 +1617,7 @@ func TestMemtableIngestInversion(t *testing.T) {
// cc
{
path := "ingest1.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand All @@ -1628,7 +1628,7 @@ func TestMemtableIngestInversion(t *testing.T) {
}
{
path := "ingest2.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand All @@ -1640,7 +1640,7 @@ func TestMemtableIngestInversion(t *testing.T) {
}
{
path := "ingest3.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand All @@ -1651,7 +1651,7 @@ func TestMemtableIngestInversion(t *testing.T) {
}
{
path := "ingest4.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -1730,7 +1730,7 @@ func TestMemtableIngestInversion(t *testing.T) {
// cc
{
path := "ingest5.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -1764,7 +1764,7 @@ func TestMemtableIngestInversion(t *testing.T) {
// cc
{
path := "ingest6.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -2035,11 +2035,13 @@ type sstAndLogFileBlockingFS struct {

var _ vfs.FS = &sstAndLogFileBlockingFS{}

func (fs *sstAndLogFileBlockingFS) Create(name string) (vfs.File, error) {
func (fs *sstAndLogFileBlockingFS) Create(
name string, category vfs.DiskWriteCategory,
) (vfs.File, error) {
if strings.HasSuffix(name, ".log") || strings.HasSuffix(name, ".sst") {
fs.unblocker.Wait()
}
return fs.FS.Create(name)
return fs.FS.Create(name, category)
}

func (fs *sstAndLogFileBlockingFS) unblock() {
Expand Down
4 changes: 2 additions & 2 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestEventListener(t *testing.T) {

case "ingest":
memLog.Reset()
f, err := mem.Create("ext/0")
f, err := mem.Create("ext/0", vfs.WriteCategoryUnspecified)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestEventListener(t *testing.T) {
return err.Error()
}
writeTable := func(name string, key byte) error {
f, err := mem.Create(name)
f, err := mem.Create(name, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion external_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func BenchmarkExternalIter_NonOverlapping_Scan(b *testing.B) {
var keys [][]byte
for i := 0; i < fileCount; i++ {
filename := fmt.Sprintf("%03d.sst", i)
wf, err := fs.Create(filename)
wf, err := fs.Create(filename, vfs.WriteCategoryUnspecified)
require.NoError(b, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(wf), writeOpts)
for j := 0; j < keyCount/fileCount; j++ {
Expand Down
Loading

0 comments on commit 278b6a6

Please sign in to comment.