diff --git a/checkpoint.go b/checkpoint.go index 56a059ab6b..f0c7f2d72a 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -403,7 +403,7 @@ func (d *DB) writeCheckpointManifest( } defer src.Close() - dst, err := fs.Create(destPath) + dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified) if err != nil { return err } diff --git a/cleaner_test.go b/cleaner_test.go index 340c5db762..1e7ea2f195 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -123,7 +123,7 @@ func TestCleaner(t *testing.T) { if len(td.CmdArgs) != 1 { return "create-bogus-file " } - dst, err := fs.Create(td.CmdArgs[0].String()) + dst, err := fs.Create(td.CmdArgs[0].String(), vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = dst.Write([]byte("bogus data")) require.NoError(t, err) diff --git a/cmd/pebble/fsbench.go b/cmd/pebble/fsbench.go index 94d437db2a..39c4605526 100644 --- a/cmd/pebble/fsbench.go +++ b/cmd/pebble/fsbench.go @@ -113,7 +113,7 @@ type fsBench struct { // createFile can be used to create an empty file. // Invariant: File shouldn't already exist. func createFile(filepath string) vfs.File { - fh, err := fsConfig.fs.Create(filepath) + fh, err := fsConfig.fs.Create(filepath, vfs.WriteCategoryUnspecified) if err != nil { log.Fatalln(err) } diff --git a/commit_test.go b/commit_test.go index affe1499c7..30d3d7b89a 100644 --- a/commit_test.go +++ b/commit_test.go @@ -224,7 +224,7 @@ func TestCommitPipelineWALClose(t *testing.T) { // rotate and close the log. mem := vfs.NewMem() - f, err := mem.Create("test-wal") + f, err := mem.Create("test-wal", vfs.WriteCategoryUnspecified) require.NoError(t, err) // syncDelayFile will block on the done channel befor returning from Sync diff --git a/compaction.go b/compaction.go index 0a24574333..082c564da6 100644 --- a/compaction.go +++ b/compaction.go @@ -2897,9 +2897,25 @@ func (d *DB) runCompaction( ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction) } } + var writeCategory vfs.DiskWriteCategory + switch c.kind { + case compactionKindFlush: + if d.opts.EnableSQLRowSpillMetrics { + // In the scenario that the Pebble engine is used for SQL row spills the data written to + // the memtable will correspond to spills to disk and should be categorized as such. + writeCategory = "sql-row-spill" + } else { + writeCategory = "pebble-memtable-flush" + } + case compactionKindIngestedFlushable: + writeCategory = "pebble-ingestion" + default: + writeCategory = "pebble-compaction" + } // Prefer shared storage if present. createOpts := objstorage.CreateOptions{ PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), + WriteCategory: writeCategory, } diskFileNum := base.PhysicalTableDiskFileNum(fileNum) writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts) diff --git a/compaction_test.go b/compaction_test.go index 953a54d7ed..456dca911c 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2752,7 +2752,7 @@ func TestCompactionErrorCleanup(t *testing.T) { ingest := func(keys ...string) { t.Helper() - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ @@ -3477,7 +3477,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) { ingestKeys := func(db *DB) error { // Create an SST for ingestion. const fName = "ext" - f, err := db.opts.FS.Create(fName) + f, err := db.opts.FS.Create(fName, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(t, w.Set(key, nil)) @@ -3682,7 +3682,7 @@ func TestCompactionErrorStats(t *testing.T) { ingest := func(keys ...string) { t.Helper() - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ diff --git a/data_test.go b/data_test.go index 5fc92fee35..83405b689c 100644 --- a/data_test.go +++ b/data_test.go @@ -620,7 +620,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { writeOpts := d.opts.MakeWriterOptions(0 /* level */, tableFormat) - f, err := fs.Create(path) + f, err := fs.Create(path, vfs.WriteCategoryUnspecified) if err != nil { return err } diff --git a/db_test.go b/db_test.go index a5d4bc29e8..7d42c06d67 100644 --- a/db_test.go +++ b/db_test.go @@ -1169,7 +1169,7 @@ func TestDBConcurrentCompactClose(t *testing.T) { // causing compactions to be running concurrently with the close below. for j := 0; j < 10; j++ { path := fmt.Sprintf("ext%d", j) - f, err := mem.Create(path) + f, err := mem.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -1617,7 +1617,7 @@ func TestMemtableIngestInversion(t *testing.T) { // cc { path := "ingest1.sst" - f, err := memFS.Create(path) + f, err := memFS.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -1628,7 +1628,7 @@ func TestMemtableIngestInversion(t *testing.T) { } { path := "ingest2.sst" - f, err := memFS.Create(path) + f, err := memFS.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -1640,7 +1640,7 @@ func TestMemtableIngestInversion(t *testing.T) { } { path := "ingest3.sst" - f, err := memFS.Create(path) + f, err := memFS.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -1651,7 +1651,7 @@ func TestMemtableIngestInversion(t *testing.T) { } { path := "ingest4.sst" - f, err := memFS.Create(path) + f, err := memFS.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -1730,7 +1730,7 @@ func TestMemtableIngestInversion(t *testing.T) { // cc { path := "ingest5.sst" - f, err := memFS.Create(path) + f, err := memFS.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -1764,7 +1764,7 @@ func TestMemtableIngestInversion(t *testing.T) { // cc { path := "ingest6.sst" - f, err := memFS.Create(path) + f, err := memFS.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ TableFormat: d.FormatMajorVersion().MaxTableFormat(), @@ -2035,11 +2035,13 @@ type sstAndLogFileBlockingFS struct { var _ vfs.FS = &sstAndLogFileBlockingFS{} -func (fs *sstAndLogFileBlockingFS) Create(name string) (vfs.File, error) { +func (fs *sstAndLogFileBlockingFS) Create( + name string, category vfs.DiskWriteCategory, +) (vfs.File, error) { if strings.HasSuffix(name, ".log") || strings.HasSuffix(name, ".sst") { fs.unblocker.Wait() } - return fs.FS.Create(name) + return fs.FS.Create(name, category) } func (fs *sstAndLogFileBlockingFS) unblock() { diff --git a/event_listener_test.go b/event_listener_test.go index 88e48d7941..4942f82323 100644 --- a/event_listener_test.go +++ b/event_listener_test.go @@ -134,7 +134,7 @@ func TestEventListener(t *testing.T) { case "ingest": memLog.Reset() - f, err := mem.Create("ext/0") + f, err := mem.Create("ext/0", vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } @@ -168,7 +168,7 @@ func TestEventListener(t *testing.T) { return err.Error() } writeTable := func(name string, key byte) error { - f, err := mem.Create(name) + f, err := mem.Create(name, vfs.WriteCategoryUnspecified) if err != nil { return err } diff --git a/external_iterator_test.go b/external_iterator_test.go index fa83eeb609..7cdb502917 100644 --- a/external_iterator_test.go +++ b/external_iterator_test.go @@ -153,7 +153,7 @@ func BenchmarkExternalIter_NonOverlapping_Scan(b *testing.B) { var keys [][]byte for i := 0; i < fileCount; i++ { filename := fmt.Sprintf("%03d.sst", i) - wf, err := fs.Create(filename) + wf, err := fs.Create(filename, vfs.WriteCategoryUnspecified) require.NoError(b, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(wf), writeOpts) for j := 0; j < keyCount/fileCount; j++ { diff --git a/ingest_test.go b/ingest_test.go index 98b2d16daa..fb4678bab4 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -94,7 +94,7 @@ func TestIngestLoad(t *testing.T) { return fmt.Sprintf("unknown cmd %s\n", k) } } - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } @@ -175,7 +175,7 @@ func TestIngestLoadRand(t *testing.T) { expected[i].StatsMarkValid() func() { - f, err := mem.Create(paths[i]) + f, err := mem.Create(paths[i], vfs.WriteCategoryUnspecified) require.NoError(t, err) keys := make([]InternalKey, 1+rng.Intn(100)) @@ -230,7 +230,7 @@ func TestIngestLoadRand(t *testing.T) { func TestIngestLoadInvalid(t *testing.T) { mem := vfs.NewMem() - f, err := mem.Create("invalid") + f, err := mem.Create("invalid", vfs.WriteCategoryUnspecified) require.NoError(t, err) require.NoError(t, f.Close()) @@ -319,7 +319,7 @@ func TestIngestLink(t *testing.T) { meta[j].fileMetadata = &fileMetadata{} meta[j].FileNum = FileNum(j) meta[j].InitPhysicalBacking() - f, err := opts.FS.Create(meta[j].path) + f, err := opts.FS.Create(meta[j].path, vfs.WriteCategoryUnspecified) require.NoError(t, err) contents[j] = []byte(fmt.Sprintf("data%d", j)) @@ -387,7 +387,7 @@ func TestIngestLinkFallback(t *testing.T) { // Verify that ingestLink succeeds if linking fails by falling back to // copying. mem := vfs.NewMem() - src, err := mem.Create("source") + src, err := mem.Create("source", vfs.WriteCategoryUnspecified) require.NoError(t, err) opts := &Options{FS: errorfs.Wrap(mem, errorfs.ErrInjected.If(errorfs.OnIndex(1)))} @@ -1058,7 +1058,7 @@ func testIngestSharedImpl( writeOpts := d.opts.MakeWriterOptions(0 /* level */, to.opts.FormatMajorVersion.MaxTableFormat()) sstPath := fmt.Sprintf("ext/replicate%d.sst", replicateCounter) - f, err := to.opts.FS.Create(sstPath) + f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified) require.NoError(t, err) replicateCounter++ w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) @@ -1556,7 +1556,7 @@ func TestConcurrentExcise(t *testing.T) { writeOpts := d.opts.MakeWriterOptions(0 /* level */, to.opts.FormatMajorVersion.MaxTableFormat()) sstPath := fmt.Sprintf("ext/replicate%d.sst", replicateCounter) - f, err := to.opts.FS.Create(sstPath) + f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified) require.NoError(t, err) replicateCounter++ w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) @@ -1991,7 +1991,7 @@ func TestIngestExternal(t *testing.T) { writeOpts := d.opts.MakeWriterOptions(0 /* level */, to.opts.FormatMajorVersion.MaxTableFormat()) sstPath := fmt.Sprintf("ext/replicate%d.sst", replicateCounter) - f, err := to.opts.FS.Create(sstPath) + f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified) require.NoError(t, err) replicateCounter++ w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) @@ -2203,7 +2203,7 @@ func BenchmarkIngestOverlappingMemtable(b *testing.B) { } // Create the overlapping sstable that will force a flush when ingested. - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) assertNoError(err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) assertNoError(w.Set([]byte("a"), nil)) @@ -2457,12 +2457,12 @@ func TestIngestError(t *testing.T) { for i := int32(0); ; i++ { mem := vfs.NewMem() - f0, err := mem.Create("ext0") + f0, err := mem.Create("ext0", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f0), sstable.WriterOptions{}) require.NoError(t, w.Set([]byte("d"), nil)) require.NoError(t, w.Close()) - f1, err := mem.Create("ext1") + f1, err := mem.Create("ext1", vfs.WriteCategoryUnspecified) require.NoError(t, err) w = sstable.NewWriter(objstorageprovider.NewFileWritable(f1), sstable.WriterOptions{}) require.NoError(t, w.Set([]byte("d"), nil)) @@ -2526,7 +2526,7 @@ func TestIngestIdempotence(t *testing.T) { fs := vfs.Default path := fs.PathJoin(dir, "ext") - f, err := fs.Create(fs.PathJoin(dir, "ext")) + f, err := fs.Create(fs.PathJoin(dir, "ext"), vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(t, w.Set([]byte("d"), nil)) @@ -2559,7 +2559,7 @@ func TestIngestCompact(t *testing.T) { src := func(i int) string { return fmt.Sprintf("ext%d", i) } - f, err := mem.Create(src(0)) + f, err := mem.Create(src(0), vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) @@ -2602,7 +2602,7 @@ func TestConcurrentIngest(t *testing.T) { src := func(i int) string { return fmt.Sprintf("ext%d", i) } - f, err := mem.Create(src(0)) + f, err := mem.Create(src(0), vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) @@ -2656,7 +2656,7 @@ func TestConcurrentIngestCompact(t *testing.T) { ingest := func(keys ...string) { t.Helper() - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) @@ -2775,7 +2775,7 @@ func TestIngestFlushQueuedMemTable(t *testing.T) { } ingest := func(keys ...string) { - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ @@ -2806,7 +2806,7 @@ func TestIngestStats(t *testing.T) { ingest := func(expectedLevel int, keys ...string) { t.Helper() - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) @@ -2854,7 +2854,7 @@ func TestIngestFlushQueuedLargeBatch(t *testing.T) { ingest := func(keys ...string) { t.Helper() - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) @@ -2892,7 +2892,7 @@ func TestIngestMemtablePendingOverlap(t *testing.T) { ingest := func(keys ...string) { t.Helper() - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) @@ -3002,7 +3002,7 @@ func TestIngestMemtableOverlapRace(t *testing.T) { require.NoError(t, err) // Prepare a sstable `ext` deleting foo. - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(t, w.Delete([]byte("foo"))) @@ -3141,7 +3141,7 @@ func TestIngestFileNumReuseCrash(t *testing.T) { var fileBytes [][]byte for i := 0; i < count; i++ { name := fmt.Sprintf("ext%d", i) - f, err := fs.Create(fs.PathJoin(dir, name)) + f, err := fs.Create(fs.PathJoin(dir, name), vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(t, w.Set([]byte(fmt.Sprintf("foo%d", i)), nil)) @@ -3201,7 +3201,7 @@ func TestIngest_UpdateSequenceNumber(t *testing.T) { mem := vfs.NewMem() cmp := base.DefaultComparer.Compare parse := func(input string) (*sstable.Writer, error) { - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) if err != nil { return nil, err } @@ -3606,7 +3606,7 @@ func TestIngestValidation(t *testing.T) { err error } runIngest := func(keyVals []keyVal) (et errT) { - f, err := fs.Create(ingestTableName) + f, err := fs.Create(ingestTableName, vfs.WriteCategoryUnspecified) require.NoError(t, err) defer func() { _ = fs.Remove(ingestTableName) }() @@ -3621,7 +3621,7 @@ func TestIngestValidation(t *testing.T) { // Possibly corrupt the file. if tc.cLoc != corruptionLocationNone { - f, err = fs.OpenReadWrite(ingestTableName) + f, err = fs.OpenReadWrite(ingestTableName, vfs.WriteCategoryUnspecified) require.NoError(t, err) corrupt(f) } @@ -3710,7 +3710,7 @@ func BenchmarkManySSTables(b *testing.B) { var paths []string for i := 0; i < count; i++ { n := fmt.Sprintf("%07d", i) - f, err := mem.Create(n) + f, err := mem.Create(n, vfs.WriteCategoryUnspecified) require.NoError(b, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(b, w.Set([]byte(n), nil)) @@ -3721,7 +3721,7 @@ func BenchmarkManySSTables(b *testing.B) { { const broadIngest = "broad.sst" - f, err := mem.Create(broadIngest) + f, err := mem.Create(broadIngest, vfs.WriteCategoryUnspecified) require.NoError(b, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(b, w.Set([]byte("0"), nil)) @@ -3747,7 +3747,7 @@ func runBenchmarkManySSTablesIngest(b *testing.B, d *DB, fs vfs.FS, count int) { b.ResetTimer() for i := 0; i < b.N; i++ { n := fmt.Sprintf("%07d", count+i) - f, err := fs.Create(n) + f, err := fs.Create(n, vfs.WriteCategoryUnspecified) require.NoError(b, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(b, w.Set([]byte(n), nil)) diff --git a/iterator_test.go b/iterator_test.go index 026e959e6f..d77c615e81 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -2925,7 +2925,7 @@ func BenchmarkSeekPrefixTombstones(b *testing.B) { for i := int64(0); i < ks.Count()-1; i++ { func() { filename := fmt.Sprintf("ext%2d", i) - f, err := o.FS.Create(filename) + f, err := o.FS.Create(filename, vfs.WriteCategoryUnspecified) require.NoError(b, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), wOpts) require.NoError(b, w.DeleteRange(testkeys.Key(ks, i), testkeys.Key(ks, i+1))) diff --git a/level_checker_test.go b/level_checker_test.go index 52b0b99fb3..901b22a2de 100644 --- a/level_checker_test.go +++ b/level_checker_test.go @@ -152,7 +152,7 @@ func TestCheckLevelsCornerCases(t *testing.T) { line = strings.TrimSpace(line) name := fmt.Sprint(fileNum) fileNum++ - f, err := memFS.Create(name) + f, err := memFS.Create(name, vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } diff --git a/level_iter_test.go b/level_iter_test.go index 01737f7ef7..c5738d5666 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -194,7 +194,7 @@ func (lt *levelIterTest) runClear(d *datadriven.TestData) string { func (lt *levelIterTest) runBuild(d *datadriven.TestData) string { fileNum := FileNum(len(lt.readers)) name := fmt.Sprint(fileNum) - f0, err := lt.mem.Create(name) + f0, err := lt.mem.Create(name, vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } @@ -459,7 +459,7 @@ func buildLevelIterTables( mem := vfs.NewMem() files := make([]vfs.File, count) for i := range files { - f, err := mem.Create(fmt.Sprintf("bench%d", i)) + f, err := mem.Create(fmt.Sprintf("bench%d", i), vfs.WriteCategoryUnspecified) if err != nil { b.Fatal(err) } diff --git a/merging_iter_test.go b/merging_iter_test.go index c4c4656aa1..6c65dfb887 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -212,7 +212,7 @@ func TestMergingIterCornerCases(t *testing.T) { line = strings.TrimSpace(line) name := fmt.Sprint(fileNum) fileNum++ - f, err := memFS.Create(name) + f, err := memFS.Create(name, vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } @@ -328,7 +328,7 @@ func buildMergingIterTables( mem := vfs.NewMem() files := make([]vfs.File, count) for i := range files { - f, err := mem.Create(fmt.Sprintf("bench%d", i)) + f, err := mem.Create(fmt.Sprintf("bench%d", i), vfs.WriteCategoryUnspecified) if err != nil { b.Fatal(err) } @@ -525,7 +525,7 @@ func buildLevelsForMergingIterSeqSeek( files := make([][]vfs.File, levelCount) for i := range files { for j := 0; j < 2; j++ { - f, err := mem.Create(fmt.Sprintf("bench%d_%d", i, j)) + f, err := mem.Create(fmt.Sprintf("bench%d_%d", i, j), vfs.WriteCategoryUnspecified) if err != nil { b.Fatal(err) } diff --git a/metamorphic/build.go b/metamorphic/build.go index ce678daa15..9ae2b66511 100644 --- a/metamorphic/build.go +++ b/metamorphic/build.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/vfs" ) // writeSSTForIngestion writes an SST that is to be ingested, either directly or @@ -165,7 +166,7 @@ func buildForIngest( t *Test, dbID objID, b *pebble.Batch, i int, ) (path string, _ *sstable.WriterMetadata, _ error) { path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i)) - f, err := t.opts.FS.Create(path) + f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified) if err != nil { return "", nil, err } @@ -199,7 +200,7 @@ func buildForIngestExternalEmulation( i int, ) (path string, _ *sstable.WriterMetadata) { path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i)) - f, err := t.opts.FS.Create(path) + f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified) panicIfErr(err) reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix) diff --git a/metamorphic/ops.go b/metamorphic/ops.go index eae83df670..63857d81d4 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/errorfs" ) @@ -2073,7 +2074,7 @@ func (r *replicateOp) run(t *Test, h historyRecorder) { source := t.getDB(r.source) dest := t.getDB(r.dest) sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx)) - f, err := t.opts.FS.Create(sstPath) + f, err := t.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified) if err != nil { h.Recordf("%s // %v", r, err) return diff --git a/metamorphic/test.go b/metamorphic/test.go index 93a54090df..e978751c33 100644 --- a/metamorphic/test.go +++ b/metamorphic/test.go @@ -358,7 +358,7 @@ func copyRemoteStorage(fs remote.Storage, outputDir string) error { return err } outputPath := vfs.Default.PathJoin(outputDir, objs[i]) - outputFile, err := vfs.Default.Create(outputPath) + outputFile, err := vfs.Default.Create(outputPath, vfs.WriteCategoryUnspecified) if err != nil { return err } diff --git a/objstorage/objstorage.go b/objstorage/objstorage.go index fdec6c6676..f0f33ff081 100644 --- a/objstorage/objstorage.go +++ b/objstorage/objstorage.go @@ -202,6 +202,10 @@ type CreateOptions struct { // SharedCleanupMethod is used for the object when it is created on shared storage. // The default (zero) value is SharedRefTracking. SharedCleanupMethod SharedCleanupMethod + + // WriteCategory is used for the object when it is created on local storage + // to collect aggregated write metrics for each write source. + WriteCategory vfs.DiskWriteCategory } // Provider is a singleton object used to access and manage objects. diff --git a/objstorage/objstorageprovider/objiotracing/obj_io_tracing_on.go b/objstorage/objstorageprovider/objiotracing/obj_io_tracing_on.go index c4cf8d6136..d12bf8e000 100644 --- a/objstorage/objstorageprovider/objiotracing/obj_io_tracing_on.go +++ b/objstorage/objstorageprovider/objiotracing/obj_io_tracing_on.go @@ -381,7 +381,7 @@ func (t *Tracer) workerWriteTraces(state *workerState, data eventBuf) { func (t *Tracer) workerNewFile(state *workerState) { filename := fmt.Sprintf("IOTRACES-%s", time.Now().UTC().Format(time.RFC3339Nano)) - file, err := t.fs.Create(t.fs.PathJoin(t.fsDir, filename)) + file, err := t.fs.Create(t.fs.PathJoin(t.fsDir, filename), vfs.WriteCategoryUnspecified) if err != nil { panic(err) } diff --git a/objstorage/objstorageprovider/provider.go b/objstorage/objstorageprovider/provider.go index 499fb4ba9e..e24f5431fd 100644 --- a/objstorage/objstorageprovider/provider.go +++ b/objstorage/objstorageprovider/provider.go @@ -247,7 +247,13 @@ func (p *provider) Create( if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone { w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts) } else { - w, meta, err = p.vfsCreate(ctx, fileType, fileNum) + var category vfs.DiskWriteCategory + if opts.WriteCategory != "" { + category = opts.WriteCategory + } else { + category = vfs.WriteCategoryUnspecified + } + w, meta, err = p.vfsCreate(ctx, fileType, fileNum, category) } if err != nil { err = errors.Wrapf(err, "creating object %s", fileNum) diff --git a/objstorage/objstorageprovider/provider_test.go b/objstorage/objstorageprovider/provider_test.go index 141905cbed..1a791d4fc4 100644 --- a/objstorage/objstorageprovider/provider_test.go +++ b/objstorage/objstorageprovider/provider_test.go @@ -154,7 +154,7 @@ func TestProvider(t *testing.T) { tmpFileCounter++ tmpFilename := fmt.Sprintf("temp-file-%d", tmpFileCounter) - f, err := fs.Create(tmpFilename) + f, err := fs.Create(tmpFilename, vfs.WriteCategoryUnspecified) require.NoError(t, err) data := make([]byte, size) genData(byte(salt), 0, data) diff --git a/objstorage/objstorageprovider/remoteobjcat/catalog.go b/objstorage/objstorageprovider/remoteobjcat/catalog.go index 319579f976..b975dd4572 100644 --- a/objstorage/objstorageprovider/remoteobjcat/catalog.go +++ b/objstorage/objstorageprovider/remoteobjcat/catalog.go @@ -335,7 +335,7 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) { } filename := makeCatalogFilename(c.mu.marker.NextIter()) filepath := c.fs.PathJoin(c.dirname, filename) - file, err := c.fs.Create(filepath) + file, err := c.fs.Create(filepath, "pebble-manifest") if err != nil { return err } diff --git a/objstorage/objstorageprovider/sharedcache/shared_cache.go b/objstorage/objstorageprovider/sharedcache/shared_cache.go index 28614d5215..f558803679 100644 --- a/objstorage/objstorageprovider/sharedcache/shared_cache.go +++ b/objstorage/objstorageprovider/sharedcache/shared_cache.go @@ -446,7 +446,7 @@ func (s *shard) init( } s.bm = makeBlockMath(blockSize) s.shardingBlockSize = shardingBlockSize - file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx))) + file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)), vfs.WriteCategoryUnspecified) if err != nil { return err } diff --git a/objstorage/objstorageprovider/vfs.go b/objstorage/objstorageprovider/vfs.go index 5b2025179a..3350c354a4 100644 --- a/objstorage/objstorageprovider/vfs.go +++ b/objstorage/objstorageprovider/vfs.go @@ -35,10 +35,13 @@ func (p *provider) vfsOpenForReading( } func (p *provider) vfsCreate( - _ context.Context, fileType base.FileType, fileNum base.DiskFileNum, + _ context.Context, + fileType base.FileType, + fileNum base.DiskFileNum, + category vfs.DiskWriteCategory, ) (objstorage.Writable, objstorage.ObjectMetadata, error) { filename := p.vfsPath(fileType, fileNum) - file, err := p.st.FS.Create(filename) + file, err := p.st.FS.Create(filename, category) if err != nil { return nil, objstorage.ObjectMetadata{}, err } diff --git a/objstorage/remote/localfs.go b/objstorage/remote/localfs.go index 539ecb132c..a421d40714 100644 --- a/objstorage/remote/localfs.go +++ b/objstorage/remote/localfs.go @@ -80,7 +80,7 @@ func (r *localFSReader) Close() error { // CreateObject is part of the remote.Storage interface. func (s *localFSStore) CreateObject(objName string) (io.WriteCloser, error) { - file, err := s.vfs.Create(path.Join(s.dirname, objName)) + file, err := s.vfs.Create(path.Join(s.dirname, objName), vfs.WriteCategoryUnspecified) return file, err } diff --git a/open.go b/open.go index bdc6f1eda5..7598049e6d 100644 --- a/open.go +++ b/open.go @@ -549,7 +549,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { // Write them to a temporary file first, in case we crash before // we're done. A corrupt options file prevents opening the // database. - optionsFile, err := opts.FS.Create(tmpPath) + optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified) if err != nil { return nil, err } diff --git a/open_test.go b/open_test.go index cee89b9454..43d1237fa3 100644 --- a/open_test.go +++ b/open_test.go @@ -521,8 +521,8 @@ type optionsTornWriteFS struct { vfs.FS } -func (fs optionsTornWriteFS) Create(name string) (vfs.File, error) { - file, err := fs.FS.Create(name) +func (fs optionsTornWriteFS) Create(name string, category vfs.DiskWriteCategory) (vfs.File, error) { + file, err := fs.FS.Create(name, category) if file != nil { file = optionsTornWriteFile{File: file} } @@ -968,7 +968,7 @@ func TestCrashOpenCrashAfterWALCreation(t *testing.T) { b, err := io.ReadAll(f) require.NoError(t, err) require.NoError(t, f.Close()) - f, err = fs.Create(logs[0]) + f, err = fs.Create(logs[0], vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = f.Write(b) require.NoError(t, err) @@ -1179,7 +1179,7 @@ func TestGetVersion(t *testing.T) { } } } - f, _ := mem.Create(fmt.Sprintf("OPTIONS-%d", highestOptionsNum+1)) + f, _ := mem.Create(fmt.Sprintf("OPTIONS-%d", highestOptionsNum+1), vfs.WriteCategoryUnspecified) _, err = f.Write([]byte("[Version]\n pebble_version=0.2\n")) require.NoError(t, err) err = f.Close() @@ -1189,7 +1189,7 @@ func TestGetVersion(t *testing.T) { require.Equal(t, "0.2", version) // Case 4: Manually created OPTIONS file with a RocksDB number. - f, _ = mem.Create(fmt.Sprintf("OPTIONS-%d", highestOptionsNum+2)) + f, _ = mem.Create(fmt.Sprintf("OPTIONS-%d", highestOptionsNum+2), vfs.WriteCategoryUnspecified) _, err = f.Write([]byte("[Version]\n rocksdb_version=6.2.1\n")) require.NoError(t, err) err = f.Close() @@ -1204,7 +1204,7 @@ func TestGetVersion(t *testing.T) { func TestOpenNeverFlushed(t *testing.T) { mem := vfs.NewMem() - sstFile, err := mem.Create("to-ingest.sst") + sstFile, err := mem.Create("to-ingest.sst", vfs.WriteCategoryUnspecified) require.NoError(t, err) writerOpts := sstable.WriterOptions{} @@ -1298,8 +1298,8 @@ func (fs *closeTrackingFS) wrap(file vfs.File, err error) (vfs.File, error) { return f, err } -func (fs *closeTrackingFS) Create(name string) (vfs.File, error) { - return fs.wrap(fs.FS.Create(name)) +func (fs *closeTrackingFS) Create(name string, category vfs.DiskWriteCategory) (vfs.File, error) { + return fs.wrap(fs.FS.Create(name, category)) } func (fs *closeTrackingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) { @@ -1310,8 +1310,10 @@ func (fs *closeTrackingFS) OpenDir(name string) (vfs.File, error) { return fs.wrap(fs.FS.OpenDir(name)) } -func (fs *closeTrackingFS) ReuseForWrite(oldname, newname string) (vfs.File, error) { - return fs.wrap(fs.FS.ReuseForWrite(oldname, newname)) +func (fs *closeTrackingFS) ReuseForWrite( + oldname, newname string, category vfs.DiskWriteCategory, +) (vfs.File, error) { + return fs.wrap(fs.FS.ReuseForWrite(oldname, newname, category)) } type closeTrackingFile struct { @@ -1414,7 +1416,7 @@ func TestCheckConsistency(t *testing.T) { } path := base.MakeFilepath(mem, dir, base.FileTypeTable, m.FileBacking.DiskFileNum) _ = mem.Remove(path) - f, err := mem.Create(path) + f, err := mem.Create(path, vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } diff --git a/options.go b/options.go index 98404860b0..d798313259 100644 --- a/options.go +++ b/options.go @@ -1047,6 +1047,10 @@ type Options struct { // Setting this to 0 disables deletion pacing, which is also the default. TargetByteDeletionRate int + // EnableSQLRowSpillMetrics specifies whether the Pebble instance will only be used + // to temporarily persist data spilled to disk for row-oriented SQL query execution. + EnableSQLRowSpillMetrics bool + // private options are only used by internal tests or are used internally // for facilitating upgrade paths of unconfigurable functionality. private struct { diff --git a/range_del_test.go b/range_del_test.go index 0df14e39d6..1d75509ab8 100644 --- a/range_del_test.go +++ b/range_del_test.go @@ -585,7 +585,7 @@ func benchmarkRangeDelIterate(b *testing.B, entries, deleted int, snapshotCompac // Create an sstable with N entries and ingest it. This is a fast way // to get a lot of entries into pebble. - f, err := mem.Create("ext") + f, err := mem.Create("ext", vfs.WriteCategoryUnspecified) if err != nil { b.Fatal(err) } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 0f304a67d3..1b7bdba8ab 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -142,7 +142,7 @@ func TestFlusherCond(t *testing.T) { func TestSyncError(t *testing.T) { mem := vfs.NewMem() - f, err := mem.Create("log") + f, err := mem.Create("log", vfs.WriteCategoryUnspecified) require.NoError(t, err) injectedErr := errors.New("injected error") diff --git a/replay/replay_test.go b/replay/replay_test.go index 00cbbce406..2c3b7eb568 100644 --- a/replay/replay_test.go +++ b/replay/replay_test.go @@ -347,7 +347,7 @@ func collectCorpus(t *testing.T, fs *vfs.MemFS, name string) { filePath = base.MakeFilepath(fs, dir, base.FileTypeManifest, fileNum) } } - f, err := fs.Create(filePath) + f, err := fs.Create(filePath, vfs.WriteCategoryUnspecified) require.NoError(t, err) b, err := hex.DecodeString(strings.ReplaceAll(td.Input, "\n", "")) require.NoError(t, err) diff --git a/replay/workload_capture.go b/replay/workload_capture.go index 2d480a587f..fded3d363f 100644 --- a/replay/workload_capture.go +++ b/replay/workload_capture.go @@ -272,7 +272,7 @@ func (w *WorkloadCollector) copyManifests(startAtIndex int, manifests []*manifes // goroutine that accesses the fields of the `manifestDetails` // struct. var err error - manifest.destFile, err = destFS.Create(w.destFilepath(destFS.PathBase(manifest.sourceFilepath))) + manifest.destFile, err = destFS.Create(w.destFilepath(destFS.PathBase(manifest.sourceFilepath)), vfs.WriteCategoryUnspecified) if err != nil { panic(err) } diff --git a/replay/workload_capture_test.go b/replay/workload_capture_test.go index 62f312f692..30ce93927c 100644 --- a/replay/workload_capture_test.go +++ b/replay/workload_capture_test.go @@ -67,7 +67,7 @@ func TestWorkloadCollector(t *testing.T) { var err error td.ScanArgs(t, "filenum", &fileNum) path := base.MakeFilepath(fs, srcDir, base.FileTypeManifest, base.DiskFileNum(fileNum)) - currentManifest, err = fs.Create(path) + currentManifest, err = fs.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = currentManifest.Write(randData(100)) require.NoError(t, err) @@ -182,7 +182,7 @@ func writeFile( t *testing.T, fs vfs.FS, dir string, typ base.FileType, fileNum base.DiskFileNum, data []byte, ) string { path := base.MakeFilepath(fs, dir, typ, fileNum) - f, err := fs.Create(path) + f, err := fs.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = f.Write(data) require.NoError(t, err) diff --git a/scan_internal_test.go b/scan_internal_test.go index 6ef4c56d39..19d95f0a23 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -457,7 +457,7 @@ func TestScanInternal(t *testing.T) { }() } else if ingest { points, rangeDels, rangeKeys := batchSort(b) - file, err := d.opts.FS.Create("temp0.sst") + file, err := d.opts.FS.Create("temp0.sst", vfs.WriteCategoryUnspecified) require.NoError(t, err) writeSST(points, rangeDels, rangeKeys, objstorageprovider.NewFileWritable(file)) require.NoError(t, d.Ingest([]string{"temp0.sst"})) diff --git a/sstable/random_test.go b/sstable/random_test.go index 1ad6d4f430..13bb890df5 100644 --- a/sstable/random_test.go +++ b/sstable/random_test.go @@ -43,7 +43,7 @@ func TestIterator_RandomErrors(t *testing.T) { func runErrorInjectionTest(t *testing.T, seed int64) { t.Logf("seed %d", seed) fs := vfs.NewMem() - f, err := fs.Create("random.sst") + f, err := fs.Create("random.sst", vfs.WriteCategoryUnspecified) require.NoError(t, err) rng := rand.New(rand.NewSource(seed)) cfg := randomTableConfig{ diff --git a/sstable/reader_test.go b/sstable/reader_test.go index cb0f1de26b..6f0ec957ac 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -857,7 +857,7 @@ func TestReaderCheckComparerMerger(t *testing.T) { } mem := vfs.NewMem() - f0, err := mem.Create(testTable) + f0, err := mem.Create(testTable, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := NewWriter(objstorageprovider.NewFileWritable(f0), writerOpts) @@ -1372,7 +1372,7 @@ func TestRandomizedPrefixSuffixRewriter(t *testing.T) { name = "fixedTS" } name = name + testCaseName - f, err := mem.Create(name) + f, err := mem.Create(name, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := NewWriter(objstorageprovider.NewFileWritable(f), WriterOptions{ @@ -1455,7 +1455,7 @@ func TestReaderChecksumErrors(t *testing.T) { { // Create an sstable with 3 data blocks. - f, err := mem.Create("test") + f, err := mem.Create("test", vfs.WriteCategoryUnspecified) require.NoError(t, err) const blockSize = 32 @@ -1501,7 +1501,7 @@ func TestReaderChecksumErrors(t *testing.T) { // Corrupt the first byte in the block. data[bh.Offset] ^= 0xff - corrupted, err := mem.Create("corrupted") + corrupted, err := mem.Create("corrupted", vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = corrupted.Write(data) require.NoError(t, err) @@ -1732,7 +1732,7 @@ func TestValidateBlockChecksums(t *testing.T) { func TestReader_TableFormat(t *testing.T) { test := func(t *testing.T, want TableFormat) { fs := vfs.NewMem() - f, err := fs.Create("test") + f, err := fs.Create("test", vfs.WriteCategoryUnspecified) require.NoError(t, err) opts := WriterOptions{TableFormat: want} @@ -1818,7 +1818,7 @@ func buildBenchmarkTable( b *testing.B, options WriterOptions, confirmTwoLevelIndex bool, offset int, ) (*Reader, [][]byte) { mem := vfs.NewMem() - f0, err := mem.Create("bench") + f0, err := mem.Create("bench", vfs.WriteCategoryUnspecified) if err != nil { b.Fatal(err) } @@ -2106,7 +2106,7 @@ func BenchmarkIteratorScanManyVersions(b *testing.B) { // 99,049,269 bytes in value blocks. setupBench := func(b *testing.B, tableFormat TableFormat, cacheSize int64) *Reader { mem := vfs.NewMem() - f0, err := mem.Create("bench") + f0, err := mem.Create("bench", vfs.WriteCategoryUnspecified) require.NoError(b, err) options.TableFormat = tableFormat w := NewWriter(objstorageprovider.NewFileWritable(f0), options) @@ -2207,7 +2207,7 @@ func BenchmarkIteratorScanNextPrefix(b *testing.B) { } setupBench := func(b *testing.B, versCount int) (r *Reader, succKeys [][]byte) { mem := vfs.NewMem() - f0, err := mem.Create("bench") + f0, err := mem.Create("bench", vfs.WriteCategoryUnspecified) require.NoError(b, err) w := NewWriter(objstorageprovider.NewFileWritable(f0), options) for i := int64(0); i < keys.Count(); i++ { @@ -2367,7 +2367,7 @@ func BenchmarkIteratorScanObsolete(b *testing.B) { keyBuf := make([]byte, keyLen) setupBench := func(b *testing.B, tableFormat TableFormat, cacheSize int64) *Reader { mem := vfs.NewMem() - f0, err := mem.Create("bench") + f0, err := mem.Create("bench", vfs.WriteCategoryUnspecified) require.NoError(b, err) options.TableFormat = tableFormat w := NewWriter(objstorageprovider.NewFileWritable(f0), options) diff --git a/sstable/table_test.go b/sstable/table_test.go index f618ee5e8e..506a38be64 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -440,7 +440,7 @@ func TestFinalBlockIsWritten(t *testing.T) { for _, vLen := range valueLengths { got, memFS := 0, vfs.NewMem() - wf, err := memFS.Create("foo") + wf, err := memFS.Create("foo", vfs.WriteCategoryUnspecified) if err != nil { t.Errorf("nk=%d, vLen=%d: memFS create: %v", nk, vLen, err) continue @@ -566,7 +566,7 @@ func TestFooterRoundTrip(t *testing.T) { for _, offset := range []int64{0, 1, 100} { t.Run(fmt.Sprintf("offset=%d", offset), func(t *testing.T) { mem := vfs.NewMem() - f, err := mem.Create("test") + f, err := mem.Create("test", vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = f.Write(buf[:offset]) @@ -626,7 +626,7 @@ func TestReadFooter(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { mem := vfs.NewMem() - f, err := mem.Create("test") + f, err := mem.Create("test", vfs.WriteCategoryUnspecified) require.NoError(t, err) _, err = f.Write([]byte(c.encoded)) diff --git a/sstable/test_fixtures.go b/sstable/test_fixtures.go index d31cc721ea..74404895c6 100644 --- a/sstable/test_fixtures.go +++ b/sstable/test_fixtures.go @@ -130,7 +130,7 @@ func buildHamletTestSST( keys := wordCount.SortedKeys() // Write the key/value pairs to a new table, in increasing key order. - f0, err := fs.Create(filename) + f0, err := fs.Create(filename, vfs.WriteCategoryUnspecified) if err != nil { return err } diff --git a/sstable/writer_rangekey_test.go b/sstable/writer_rangekey_test.go index 794a693577..8e4e943e80 100644 --- a/sstable/writer_rangekey_test.go +++ b/sstable/writer_rangekey_test.go @@ -25,7 +25,7 @@ func TestWriter_RangeKeys(t *testing.T) { buildFn := func(td *datadriven.TestData) (*Reader, error) { mem := vfs.NewMem() - f, err := mem.Create("test") + f, err := mem.Create("test", vfs.WriteCategoryUnspecified) if err != nil { return nil, err } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 32ff87c74d..daa07a43e3 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -505,7 +505,7 @@ func TestDoubleClose(t *testing.T) { func TestParallelWriterErrorProp(t *testing.T) { fs := vfs.NewMem() - f, err := fs.Create("test") + f, err := fs.Create("test", vfs.WriteCategoryUnspecified) require.NoError(t, err) opts := WriterOptions{ TableFormat: TableFormatPebblev1, BlockSize: 1, Parallelism: true, @@ -600,7 +600,7 @@ func TestWriterClearCache(t *testing.T) { } build := func(name string) { - f, err := mem.Create(name) + f, err := mem.Create(name, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := NewWriter(objstorageprovider.NewFileWritable(f), writerOpts, cacheOpts) @@ -751,7 +751,7 @@ func TestWriterBlockPropertiesErrors(t *testing.T) { for _, tc := range testCases { t.Run("", func(t *testing.T) { fs := vfs.NewMem() - f, err := fs.Create("test") + f, err := fs.Create("test", vfs.WriteCategoryUnspecified) require.NoError(t, err) w := NewWriter(objstorageprovider.NewFileWritable(f), WriterOptions{ @@ -838,7 +838,7 @@ func TestWriter_TableFormatCompatibility(t *testing.T) { for tf := TableFormatLevelDB; tf <= TableFormatMax; tf++ { t.Run(tf.String(), func(t *testing.T) { fs := vfs.NewMem() - f, err := fs.Create("sst") + f, err := fs.Create("sst", vfs.WriteCategoryUnspecified) require.NoError(t, err) opts := WriterOptions{TableFormat: tf} diff --git a/table_stats_test.go b/table_stats_test.go index bcbcaa8bcf..76d74d9b3c 100644 --- a/table_stats_test.go +++ b/table_stats_test.go @@ -197,7 +197,7 @@ func TestTableRangeDeletionIter(t *testing.T) { datadriven.RunTest(t, "testdata/table_stats_deletion_iter", func(t *testing.T, td *datadriven.TestData) string { switch cmd := td.Cmd; cmd { case "build": - f, err := fs.Create("tmp.sst") + f, err := fs.Create("tmp.sst", vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } diff --git a/version_set.go b/version_set.go index 4b4cfc231c..4f00421d45 100644 --- a/version_set.go +++ b/version_set.go @@ -878,7 +878,7 @@ func (vs *versionSet) createManifest( vs.fs.Remove(filename) } }() - manifestFile, err = vs.fs.Create(filename) + manifestFile, err = vs.fs.Create(filename, "pebble-manifest") if err != nil { return err } diff --git a/version_set_test.go b/version_set_test.go index f38f967828..211930b8c8 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -29,7 +29,7 @@ import ( func writeAndIngest(t *testing.T, mem vfs.FS, d *DB, k InternalKey, v []byte, filename string) { path := mem.PathJoin("ext", filename) - f, err := mem.Create(path) + f, err := mem.Create(path, vfs.WriteCategoryUnspecified) require.NoError(t, err) w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(t, w.Add(k, v)) diff --git a/vfs/atomicfs/marker.go b/vfs/atomicfs/marker.go index c640754b9d..9624f14ff9 100644 --- a/vfs/atomicfs/marker.go +++ b/vfs/atomicfs/marker.go @@ -193,7 +193,7 @@ func (a *Marker) Move(newValue string) error { oldFilename := a.filename // Create the new marker. - f, err := a.fs.Create(dstPath) + f, err := a.fs.Create(dstPath, vfs.WriteCategoryUnspecified) if err != nil { // On a distributed filesystem, an error doesn't guarantee that // the file wasn't created. A retry of the same Move call will diff --git a/vfs/atomicfs/marker_test.go b/vfs/atomicfs/marker_test.go index f43c90dd6a..2ffba22cf2 100644 --- a/vfs/atomicfs/marker_test.go +++ b/vfs/atomicfs/marker_test.go @@ -148,7 +148,7 @@ func TestMarker(t *testing.T) { case "touch": for _, filename := range strings.Split(td.Input, "\n") { - f, err := memFS.Create(filename) + f, err := memFS.Create(filename, vfs.WriteCategoryUnspecified) if err != nil { return err.Error() } diff --git a/vfs/clone.go b/vfs/clone.go index 5d6edf2d6a..5562a37f68 100644 --- a/vfs/clone.go +++ b/vfs/clone.go @@ -118,7 +118,7 @@ func Clone(srcFS, dstFS FS, srcPath, dstPath string, opts ...CloneOption) (bool, if err != nil { return false, err } - dstFile, err := dstFS.Create(dstPath) + dstFile, err := dstFS.Create(dstPath, WriteCategoryUnspecified) if err != nil { return false, err } diff --git a/vfs/disk_full.go b/vfs/disk_full.go index 16f6a5df96..3560551706 100644 --- a/vfs/disk_full.go +++ b/vfs/disk_full.go @@ -187,14 +187,14 @@ func (fs *enospcFS) handleENOSPC(gen uint32) { } } -func (fs *enospcFS) Create(name string) (File, error) { +func (fs *enospcFS) Create(name string, category DiskWriteCategory) (File, error) { gen := fs.waitUntilReady() - f, err := fs.inner.Create(name) + f, err := fs.inner.Create(name, category) if err != nil && isENOSPC(err) { fs.handleENOSPC(gen) - f, err = fs.inner.Create(name) + f, err = fs.inner.Create(name, category) } if f != nil { f = &enospcFile{ @@ -228,8 +228,10 @@ func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) { return f, err } -func (fs *enospcFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) { - f, err := fs.inner.OpenReadWrite(name, opts...) +func (fs *enospcFS) OpenReadWrite( + name string, category DiskWriteCategory, opts ...OpenOption, +) (File, error) { + f, err := fs.inner.OpenReadWrite(name, category, opts...) if f != nil { f = &enospcFile{ fs: fs, @@ -286,14 +288,16 @@ func (fs *enospcFS) Rename(oldname, newname string) error { return err } -func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) { +func (fs *enospcFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { gen := fs.waitUntilReady() - f, err := fs.inner.ReuseForWrite(oldname, newname) + f, err := fs.inner.ReuseForWrite(oldname, newname, category) if err != nil && isENOSPC(err) { fs.handleENOSPC(gen) - f, err = fs.inner.ReuseForWrite(oldname, newname) + f, err = fs.inner.ReuseForWrite(oldname, newname, category) } if f != nil { diff --git a/vfs/disk_full_test.go b/vfs/disk_full_test.go index d56fc55726..bd621237b0 100644 --- a/vfs/disk_full_test.go +++ b/vfs/disk_full_test.go @@ -19,7 +19,7 @@ import ( var filesystemWriteOps = map[string]func(FS) error{ "Create": func(fs FS) error { - _, err := fs.Create("foo") + _, err := fs.Create("foo", WriteCategoryUnspecified) return err }, "Lock": func(fs FS) error { @@ -27,7 +27,7 @@ var filesystemWriteOps = map[string]func(FS) error{ return err }, "ReuseForWrite": func(fs FS) error { - _, err := fs.ReuseForWrite("foo", "bar") + _, err := fs.ReuseForWrite("foo", "bar", WriteCategoryUnspecified) return err }, "Link": func(fs FS) error { return fs.Link("foo", "bar") }, @@ -68,7 +68,7 @@ func TestOnDiskFull_File(t *testing.T) { callbackInvocations++ }) - f, err := fs.Create("foo") + f, err := fs.Create("foo", WriteCategoryUnspecified) require.NoError(t, err) // The next Write should ENOSPC. @@ -93,7 +93,7 @@ func TestOnDiskFull_File(t *testing.T) { callbackInvocations++ }) - f, err := fs.Create("foo") + f, err := fs.Create("foo", WriteCategoryUnspecified) require.NoError(t, err) // The next Sync should ENOSPC. The callback should be invoked, but a @@ -124,7 +124,7 @@ func TestOnDiskFull_Concurrent(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := fs.Create("foo") + _, err := fs.Create("foo", WriteCategoryUnspecified) // They all should succeed on retry. require.NoError(t, err) }() @@ -160,7 +160,7 @@ func (fs *enospcMockFS) maybeENOSPC() error { return nil } -func (fs *enospcMockFS) Create(name string) (File, error) { +func (fs *enospcMockFS) Create(name string, category DiskWriteCategory) (File, error) { if err := fs.maybeENOSPC(); err != nil { return nil, err } @@ -195,7 +195,9 @@ func (fs *enospcMockFS) Rename(oldname, newname string) error { return nil } -func (fs *enospcMockFS) ReuseForWrite(oldname, newname string) (File, error) { +func (fs *enospcMockFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { if err := fs.maybeENOSPC(); err != nil { return nil, err } @@ -242,7 +244,7 @@ func (f *enospcMockFile) Sync() error { func BenchmarkOnDiskFull(b *testing.B) { fs := OnDiskFull(NewMem(), func() {}) - f, err := fs.Create("foo") + f, err := fs.Create("foo", WriteCategoryUnspecified) require.NoError(b, err) defer func() { require.NoError(b, f.Close()) }() diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 6ef4b9ebb1..2daec9cd76 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -106,11 +106,11 @@ func (o OpType) String() string { // stats for disk writes. The prefix "pebble-" is reserved for internal Pebble categories. // // Some examples include, pebble-wal, pebble-memtable-flush, pebble-manifest and in the -// Cockroach context includes, sql-spill, range-snapshot, node-log. +// Cockroach context includes, sql-row-spill, range-snapshot, crdb-log. type DiskWriteCategory string // WriteCategoryUnspecified denotes a disk write without a significant category. -const WriteCategoryUnspecified = "unspecified" +const WriteCategoryUnspecified DiskWriteCategory = "unspecified" // DiskWriteStatsAggregate is an aggregate of the bytes written to disk for a given category. type DiskWriteStatsAggregate struct { @@ -131,6 +131,21 @@ func NewDiskWriteStatsCollector() *DiskWriteStatsCollector { } } +// CreateStat inserts a new category to the statsMap if it doesn't already exist, otherwise +// it returns a pointer to the current stats. +func (d *DiskWriteStatsCollector) CreateStat(category DiskWriteCategory) *atomic.Uint64 { + var bytesWritten *atomic.Uint64 + d.mu.Lock() + defer d.mu.Unlock() + if aggStats, ok := d.statsMap[category]; !ok { + bytesWritten = new(atomic.Uint64) + d.statsMap[category] = bytesWritten + } else { + bytesWritten = aggStats + } + return bytesWritten +} + // GetStats returns the aggregated metrics for all categories. func (d *DiskWriteStatsCollector) GetStats() []DiskWriteStatsAggregate { var stats []DiskWriteStatsAggregate @@ -184,10 +199,11 @@ type diskHealthCheckingFile struct { lastWritePacked atomic.Uint64 createTimeNanos int64 - // aggBytesWritten points to an atomic that aggregates the bytes written - // for files that belong to a specific DiskWriteCategory. This pointer is also stored in the + // aggBytesWritten points to an atomic that aggregates the bytes written for files + // that belong to a specific DiskWriteCategory. This pointer is also stored in the // DiskWriteStatsCollector for metric collection. aggBytesWritten *atomic.Uint64 + category DiskWriteCategory } // diskHealthCheckingFile implements File. @@ -204,14 +220,7 @@ func newDiskHealthCheckingFile( ) *diskHealthCheckingFile { var bytesWritten *atomic.Uint64 if statsCollector != nil { - statsCollector.mu.Lock() - if aggStats, ok := statsCollector.statsMap[category]; !ok { - bytesWritten = new(atomic.Uint64) - statsCollector.statsMap[category] = bytesWritten - } else { - bytesWritten = aggStats - } - statsCollector.mu.Unlock() + bytesWritten = statsCollector.CreateStat(category) } else { bytesWritten = new(atomic.Uint64) } @@ -225,6 +234,7 @@ func newDiskHealthCheckingFile( createTimeNanos: time.Now().UnixNano(), aggBytesWritten: bytesWritten, + category: category, } } @@ -724,11 +734,11 @@ func (d *diskHealthCheckingFS) Close() error { } // Create implements the FS interface. -func (d *diskHealthCheckingFS) Create(name string) (File, error) { +func (d *diskHealthCheckingFS) Create(name string, category DiskWriteCategory) (File, error) { var f File var err error d.timeFilesystemOp(name, OpTypeCreate, func() { - f, err = d.fs.Create(name) + f, err = d.fs.Create(name, category) }, time.Now().UnixNano()) if err != nil { return f, err @@ -736,8 +746,7 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) { if d.diskSlowThreshold == 0 { return f, nil } - // TODO(cheranm): add plumbing to pass down valid category. - checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, WriteCategoryUnspecified, d.statsCollector, + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) { d.onSlowDisk( DiskSlowInfo{ @@ -790,13 +799,14 @@ func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, erro } // OpenReadWrite implements the FS interface. -func (d *diskHealthCheckingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) { - f, err := d.fs.OpenReadWrite(name, opts...) +func (d *diskHealthCheckingFS) OpenReadWrite( + name string, category DiskWriteCategory, opts ...OpenOption, +) (File, error) { + f, err := d.fs.OpenReadWrite(name, category, opts...) if err != nil { return nil, err } - // TODO(cheranm): add plumbing to pass down valid category. - return newDiskHealthCheckingFile(f, 0, WriteCategoryUnspecified, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) {}), nil + return newDiskHealthCheckingFile(f, 0, category, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) {}), nil } // OpenDir implements the FS interface. @@ -857,11 +867,13 @@ func (d *diskHealthCheckingFS) Rename(oldname, newname string) error { } // ReuseForWrite implements the FS interface. -func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) { +func (d *diskHealthCheckingFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { var f File var err error d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() { - f, err = d.fs.ReuseForWrite(oldname, newname) + f, err = d.fs.ReuseForWrite(oldname, newname, category) }, time.Now().UnixNano()) if err != nil { return f, err @@ -869,8 +881,7 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err if d.diskSlowThreshold == 0 { return f, nil } - // TODO(cheranm): add plumbing to pass down valid category. - checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, WriteCategoryUnspecified, d.statsCollector, + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) { d.onSlowDisk( DiskSlowInfo{ diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index bd8775bf9a..d452fb0603 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -79,7 +79,7 @@ func (m mockFile) SyncTo(int64) (fullSync bool, err error) { var _ File = &mockFile{} type mockFS struct { - create func(string) (File, error) + create func(string, DiskWriteCategory) (File, error) link func(string, string) error list func(string) ([]string, error) lock func(string) (io.Closer, error) @@ -92,16 +92,16 @@ type mockFS struct { remove func(string) error removeAll func(string) error rename func(string, string) error - reuseForWrite func(string, string) (File, error) + reuseForWrite func(string, string, DiskWriteCategory) (File, error) stat func(string) (os.FileInfo, error) getDiskUsage func(string) (DiskUsage, error) } -func (m mockFS) Create(name string) (File, error) { +func (m mockFS) Create(name string, category DiskWriteCategory) (File, error) { if m.create == nil { panic("unimplemented") } - return m.create(name) + return m.create(name, category) } func (m mockFS) Link(oldname, newname string) error { @@ -118,7 +118,9 @@ func (m mockFS) Open(name string, opts ...OpenOption) (File, error) { return m.open(name, opts...) } -func (m mockFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) { +func (m mockFS) OpenReadWrite( + name string, category DiskWriteCategory, opts ...OpenOption, +) (File, error) { panic("unimplemented") } @@ -150,11 +152,11 @@ func (m mockFS) Rename(oldname, newname string) error { return m.rename(oldname, newname) } -func (m mockFS) ReuseForWrite(oldname, newname string) (File, error) { +func (m mockFS) ReuseForWrite(oldname, newname string, category DiskWriteCategory) (File, error) { if m.reuseForWrite == nil { panic("unimplemented") } - return m.reuseForWrite(oldname, newname) + return m.reuseForWrite(oldname, newname, category) } func (m mockFS) MkdirAll(dir string, perm os.FileMode) error { @@ -252,14 +254,22 @@ func TestDiskHealthChecking_WriteStatsCollector(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { statsCollector := NewDiskWriteStatsCollector() + mockFS := &mockFS{create: func(name string, category DiskWriteCategory) (File, error) { + return mockFile{syncAndWriteDuration: 0}, nil + }} + fs, closer := WithDiskHealthChecks(mockFS, time.Millisecond, statsCollector, func(info DiskSlowInfo) {}) + defer closer.Close() + for _, category := range tc.writeCategories { - f := newDiskHealthCheckingFile(&mockFile{syncAndWriteDuration: 0}, 0, category, statsCollector, - func(OpType OpType, writeSizeInBytes int, duration time.Duration) {}) + f, err := fs.Create("test", category) + require.NoError(t, err) for i := 0; i < tc.numWrites; i++ { n, err := f.Write(writeBuffer) require.NoError(t, err) require.Equal(t, writeSizeInBytes, uint64(n)) } + err = f.Close() + require.NoError(t, err) } expectedStats := statsCollector.GetStats() require.Equal(t, tc.wantStats, expectedStats) @@ -304,7 +314,7 @@ func TestDiskHealthChecking_File(t *testing.T) { for _, tc := range testCases { t.Run(tc.op.String(), func(t *testing.T) { diskSlow := make(chan DiskSlowInfo, 3) - mockFS := &mockFS{create: func(name string) (File, error) { + mockFS := &mockFS{create: func(name string, category DiskWriteCategory) (File, error) { return mockFile{syncAndWriteDuration: tc.writeDuration}, nil }} fs, closer := WithDiskHealthChecks(mockFS, slowThreshold, nil, @@ -312,7 +322,7 @@ func TestDiskHealthChecking_File(t *testing.T) { diskSlow <- info }) defer closer.Close() - dhFile, _ := fs.Create("test") + dhFile, _ := fs.Create("test", WriteCategoryUnspecified) defer dhFile.Close() // Writing after file creation tests computation of delta between file @@ -455,7 +465,7 @@ var ( // the provided channel on filesystem operations. func filesystemOpsMockFS(ch chan struct{}) *mockFS { return &mockFS{ - create: func(name string) (File, error) { + create: func(name string, category DiskWriteCategory) (File, error) { <-ch return nil, errInjected }, @@ -479,7 +489,7 @@ func filesystemOpsMockFS(ch chan struct{}) *mockFS { <-ch return errInjected }, - reuseForWrite: func(oldname, newname string) (File, error) { + reuseForWrite: func(oldname, newname string, category DiskWriteCategory) (File, error) { <-ch return nil, errInjected }, @@ -490,7 +500,7 @@ func stallFilesystemOperations(fs FS) []filesystemOperation { return []filesystemOperation{ { "create", OpTypeCreate, func() { - f, _ := fs.Create("foo") + f, _ := fs.Create("foo", WriteCategoryUnspecified) if f != nil { f.Close() } @@ -512,7 +522,7 @@ func stallFilesystemOperations(fs FS) []filesystemOperation { "rename", OpTypeRename, func() { _ = fs.Rename("foo", "bar") }, }, { - "reuseforwrite", OpTypeReuseForWrite, func() { _, _ = fs.ReuseForWrite("foo", "bar") }, + "reuseforwrite", OpTypeReuseForWrite, func() { _, _ = fs.ReuseForWrite("foo", "bar", WriteCategoryUnspecified) }, }, } } @@ -574,7 +584,7 @@ func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { const stallThreshold = 10 * time.Millisecond stallChan := make(chan struct{}, 1) mockFS := &mockFS{ - create: func(name string) (File, error) { + create: func(name string, category DiskWriteCategory) (File, error) { <-stallChan return &mockFile{}, nil }, @@ -600,7 +610,7 @@ func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - f, _ := fs.Create(filename) + f, _ := fs.Create(filename, WriteCategoryUnspecified) if f != nil { f.Close() } diff --git a/vfs/errorfs/errorfs.go b/vfs/errorfs/errorfs.go index 311e87abce..5e1ed1a8bf 100644 --- a/vfs/errorfs/errorfs.go +++ b/vfs/errorfs/errorfs.go @@ -299,11 +299,11 @@ func (fs *FS) Unwrap() vfs.FS { } // Create implements FS.Create. -func (fs *FS) Create(name string) (vfs.File, error) { +func (fs *FS) Create(name string, category vfs.DiskWriteCategory) (vfs.File, error) { if err := fs.inj.MaybeError(Op{Kind: OpCreate, Path: name}); err != nil { return nil, err } - f, err := fs.fs.Create(name) + f, err := fs.fs.Create(name, category) if err != nil { return nil, err } @@ -335,11 +335,13 @@ func (fs *FS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) { } // OpenReadWrite implements FS.OpenReadWrite. -func (fs *FS) OpenReadWrite(name string, opts ...vfs.OpenOption) (vfs.File, error) { +func (fs *FS) OpenReadWrite( + name string, category vfs.DiskWriteCategory, opts ...vfs.OpenOption, +) (vfs.File, error) { if err := fs.inj.MaybeError(Op{Kind: OpOpen, Path: name}); err != nil { return nil, err } - f, err := fs.fs.OpenReadWrite(name) + f, err := fs.fs.OpenReadWrite(name, category) if err != nil { return nil, err } @@ -414,11 +416,13 @@ func (fs *FS) Rename(oldname, newname string) error { } // ReuseForWrite implements FS.ReuseForWrite. -func (fs *FS) ReuseForWrite(oldname, newname string) (vfs.File, error) { +func (fs *FS) ReuseForWrite( + oldname, newname string, category vfs.DiskWriteCategory, +) (vfs.File, error) { if err := fs.inj.MaybeError(Op{Kind: OpReuseForWrite, Path: oldname}); err != nil { return nil, err } - return fs.fs.ReuseForWrite(oldname, newname) + return fs.fs.ReuseForWrite(oldname, newname, category) } // MkdirAll implements FS.MkdirAll. diff --git a/vfs/logging_fs.go b/vfs/logging_fs.go index 820c081c05..aa52436c20 100644 --- a/vfs/logging_fs.go +++ b/vfs/logging_fs.go @@ -28,9 +28,9 @@ type loggingFS struct { var _ FS = (*loggingFS)(nil) -func (fs *loggingFS) Create(name string) (File, error) { +func (fs *loggingFS) Create(name string, category DiskWriteCategory) (File, error) { fs.logFn("create: %s", name) - f, err := fs.FS.Create(name) + f, err := fs.FS.Create(name, category) if err != nil { return nil, err } @@ -46,9 +46,11 @@ func (fs *loggingFS) Open(name string, opts ...OpenOption) (File, error) { return newLoggingFile(f, name, fs.logFn), nil } -func (fs *loggingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) { +func (fs *loggingFS) OpenReadWrite( + name string, category DiskWriteCategory, opts ...OpenOption, +) (File, error) { fs.logFn("open-read-write: %s", name) - f, err := fs.FS.OpenReadWrite(name, opts...) + f, err := fs.FS.OpenReadWrite(name, category, opts...) if err != nil { return nil, err } @@ -74,9 +76,11 @@ func (fs *loggingFS) Rename(oldname, newname string) error { return fs.FS.Rename(oldname, newname) } -func (fs *loggingFS) ReuseForWrite(oldname, newname string) (File, error) { +func (fs *loggingFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { fs.logFn("reuseForWrite: %s -> %s", oldname, newname) - f, err := fs.FS.ReuseForWrite(oldname, newname) + f, err := fs.FS.ReuseForWrite(oldname, newname, category) if err != nil { return nil, err } diff --git a/vfs/mem_fs.go b/vfs/mem_fs.go index fd82d75edc..5bf966400e 100644 --- a/vfs/mem_fs.go +++ b/vfs/mem_fs.go @@ -202,7 +202,7 @@ func (y *MemFS) walk(fullname string, f func(dir *memNode, frag string, final bo } // Create implements FS.Create. -func (y *MemFS) Create(fullname string) (File, error) { +func (y *MemFS) Create(fullname string, category DiskWriteCategory) (File, error) { var ret *memFile err := y.walk(fullname, func(dir *memNode, frag string, final bool) error { if final { @@ -314,11 +314,13 @@ func (y *MemFS) Open(fullname string, opts ...OpenOption) (File, error) { } // OpenReadWrite implements FS.OpenReadWrite. -func (y *MemFS) OpenReadWrite(fullname string, opts ...OpenOption) (File, error) { +func (y *MemFS) OpenReadWrite( + fullname string, category DiskWriteCategory, opts ...OpenOption, +) (File, error) { f, err := y.open(fullname, true /* openForWrite */) pathErr, ok := err.(*os.PathError) if ok && pathErr.Err == oserror.ErrNotExist { - return y.Create(fullname) + return y.Create(fullname, category) } return f, err } @@ -415,7 +417,7 @@ func (y *MemFS) Rename(oldname, newname string) error { } // ReuseForWrite implements FS.ReuseForWrite. -func (y *MemFS) ReuseForWrite(oldname, newname string) (File, error) { +func (y *MemFS) ReuseForWrite(oldname, newname string, category DiskWriteCategory) (File, error) { if err := y.Rename(oldname, newname); err != nil { return nil, err } @@ -477,7 +479,7 @@ func (y *MemFS) Lock(fullname string) (io.Closer, error) { // directory. Create the path so that we have the normal detection of // non-existent directory paths, and make the lock visible when listing // directory entries. - f, err := y.Create(fullname) + f, err := y.Create(fullname, WriteCategoryUnspecified) if err != nil { // "Release" the lock since we failed. y.lockedFiles.Delete(fullname) diff --git a/vfs/mem_fs_test.go b/vfs/mem_fs_test.go index 62a16e1927..c821845876 100644 --- a/vfs/mem_fs_test.go +++ b/vfs/mem_fs_test.go @@ -39,7 +39,7 @@ func runTestCases(t *testing.T, testCases []string, fs *MemFS) { ) switch s[0] { case "create": - g, err = fs.Create(s[1]) + g, err = fs.Create(s[1], WriteCategoryUnspecified) case "link": err = fs.Link(s[1], s[2]) case "open": @@ -53,7 +53,7 @@ func runTestCases(t *testing.T, testCases []string, fs *MemFS) { case "rename": err = fs.Rename(s[1], s[2]) case "reuseForWrite": - g, err = fs.ReuseForWrite(s[1], s[2]) + g, err = fs.ReuseForWrite(s[1], s[2], WriteCategoryUnspecified) case "resetToSynced": fs.ResetToSyncedState() case "ignoreSyncs": @@ -218,7 +218,7 @@ func TestList(t *testing.T) { "/foot", } for _, filename := range filenames { - f, err := fs.Create(filename) + f, err := fs.Create(filename, WriteCategoryUnspecified) if err != nil { t.Fatalf("Create %q: %v", filename, err) } diff --git a/vfs/syncing_file.go b/vfs/syncing_file.go index cf598627ac..2dfb63f95a 100644 --- a/vfs/syncing_file.go +++ b/vfs/syncing_file.go @@ -201,15 +201,17 @@ type syncingFS struct { var _ FS = (*syncingFS)(nil) -func (fs *syncingFS) Create(name string) (File, error) { - f, err := fs.FS.Create(name) +func (fs *syncingFS) Create(name string, category DiskWriteCategory) (File, error) { + f, err := fs.FS.Create(name, category) if err != nil { return nil, err } return NewSyncingFile(f, fs.syncOpts), nil } -func (fs *syncingFS) ReuseForWrite(oldname, newname string) (File, error) { +func (fs *syncingFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { // TODO(radu): implement this if needed. panic("unimplemented") } diff --git a/vfs/syncing_file_test.go b/vfs/syncing_file_test.go index 3a49bdcb14..bf572b80a8 100644 --- a/vfs/syncing_file_test.go +++ b/vfs/syncing_file_test.go @@ -23,7 +23,7 @@ func TestSyncingFile(t *testing.T) { require.NoError(t, tmpf.Close()) defer os.Remove(filename) - f, err := Default.Create(filename) + f, err := Default.Create(filename, WriteCategoryUnspecified) require.NoError(t, err) tf := &mockSyncToFile{File: f, canSyncTo: true} @@ -80,7 +80,7 @@ close: test [] require.NoError(t, tmpf.Close()) defer os.Remove(filename) - f, err := Default.Create(filename) + f, err := Default.Create(filename, WriteCategoryUnspecified) require.NoError(t, err) var buf bytes.Buffer @@ -149,7 +149,7 @@ func TestSyncingFileNoSyncOnClose(t *testing.T) { require.NoError(t, tmpf.Close()) defer os.Remove(filename) - f, err := Default.Create(filename) + f, err := Default.Create(filename, WriteCategoryUnspecified) require.NoError(t, err) tf := &mockSyncToFile{f, c.useSyncTo} diff --git a/vfs/vfs.go b/vfs/vfs.go index 3cc0b43ab1..5879cf2277 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -93,7 +93,7 @@ type FS interface { // Create creates the named file for reading and writing. If a file // already exists at the provided name, it's removed first ensuring the // resulting file descriptor points to a new inode. - Create(name string) (File, error) + Create(name string, category DiskWriteCategory) (File, error) // Link creates newname as a hard link to the oldname file. Link(oldname, newname string) error @@ -103,7 +103,7 @@ type FS interface { // OpenReadWrite opens the named file for reading and writing. If the file // does not exist, it is created. - OpenReadWrite(name string, opts ...OpenOption) (File, error) + OpenReadWrite(name string, category DiskWriteCategory, opts ...OpenOption) (File, error) // OpenDir opens the named directory for syncing. OpenDir(name string) (File, error) @@ -125,7 +125,7 @@ type FS interface { // to reuse oldname, and simply create the file with newname -- in this case the implementation // should delete oldname. If the caller calls this function with an oldname that does not exist, // the implementation may return an error. - ReuseForWrite(oldname, newname string) (File, error) + ReuseForWrite(oldname, newname string, category DiskWriteCategory) (File, error) // MkdirAll creates a directory and all necessary parents. The permission // bits perm have the same semantics as in os.MkdirAll. If the directory @@ -201,7 +201,7 @@ func wrapOSFile(f *os.File) File { return wrapOSFileImpl(f) } -func (defaultFS) Create(name string) (File, error) { +func (defaultFS) Create(name string, category DiskWriteCategory) (File, error) { const openFlags = os.O_RDWR | os.O_CREATE | os.O_EXCL | syscall.O_CLOEXEC osFile, err := os.OpenFile(name, openFlags, 0666) @@ -238,7 +238,9 @@ func (defaultFS) Open(name string, opts ...OpenOption) (File, error) { return file, nil } -func (defaultFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) { +func (defaultFS) OpenReadWrite( + name string, category DiskWriteCategory, opts ...OpenOption, +) (File, error) { osFile, err := os.OpenFile(name, os.O_RDWR|syscall.O_CLOEXEC|os.O_CREATE, 0666) if err != nil { return nil, errors.WithStack(err) @@ -262,7 +264,9 @@ func (defaultFS) Rename(oldname, newname string) error { return errors.WithStack(os.Rename(oldname, newname)) } -func (fs defaultFS) ReuseForWrite(oldname, newname string) (File, error) { +func (fs defaultFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { if err := fs.Rename(oldname, newname); err != nil { return nil, errors.WithStack(err) } @@ -344,7 +348,7 @@ func CopyAcrossFS(srcFS FS, oldname string, dstFS FS, newname string) error { } defer src.Close() - dst, err := dstFS.Create(newname) + dst, err := dstFS.Create(newname, WriteCategoryUnspecified) if err != nil { return err } @@ -365,7 +369,7 @@ func LimitedCopy(fs FS, oldname, newname string, maxBytes int64) error { } defer src.Close() - dst, err := fs.Create(newname) + dst, err := fs.Create(newname, WriteCategoryUnspecified) if err != nil { return err } diff --git a/vfs/vfs_test.go b/vfs/vfs_test.go index 39082fce3b..75373cf347 100644 --- a/vfs/vfs_test.go +++ b/vfs/vfs_test.go @@ -53,8 +53,8 @@ func (fs vfsTestFS) stripBase(path string) string { return path } -func (fs vfsTestFS) Create(name string) (File, error) { - f, err := fs.FS.Create(name) +func (fs vfsTestFS) Create(name string, category DiskWriteCategory) (File, error) { + f, err := fs.FS.Create(name, category) fmt.Fprintf(fs.w, "create: %s [%v]\n", fs.stripBase(name), normalizeError(err)) return vfsTestFSFile{f, fs.PathBase(name), fs.w}, err } @@ -69,8 +69,10 @@ func (fs vfsTestFS) Link(oldname, newname string) error { return err } -func (fs vfsTestFS) ReuseForWrite(oldname, newname string) (File, error) { - f, err := fs.FS.ReuseForWrite(oldname, newname) +func (fs vfsTestFS) ReuseForWrite( + oldname, newname string, category DiskWriteCategory, +) (File, error) { + f, err := fs.FS.ReuseForWrite(oldname, newname, category) if err == nil { f = vfsTestFSFile{f, fs.PathBase(newname), fs.w} } @@ -203,7 +205,7 @@ func runTestVFS(t *testing.T, baseFS FS, dir string) { if len(parts) != 2 { return "create " } - f, _ := fs.Create(fs.PathJoin(dir, parts[1])) + f, _ := fs.Create(fs.PathJoin(dir, parts[1]), WriteCategoryUnspecified) f.Close() case "link": @@ -222,7 +224,7 @@ func runTestVFS(t *testing.T, baseFS FS, dir string) { if len(parts) != 3 { return "reuseForWrite " } - _, _ = fs.ReuseForWrite(fs.PathJoin(dir, parts[1]), fs.PathJoin(dir, parts[2])) + _, _ = fs.ReuseForWrite(fs.PathJoin(dir, parts[1]), fs.PathJoin(dir, parts[2]), WriteCategoryUnspecified) case "list": if len(parts) != 2 { @@ -297,7 +299,7 @@ func TestVFSCreateLinkSemantics(t *testing.T) { t.Run(fmt.Sprintf("%T", fs), func(t *testing.T) { writeFile := func(path, contents string) { path = fs.PathJoin(dir, path) - f, err := fs.Create(path) + f, err := fs.Create(path, WriteCategoryUnspecified) require.NoError(t, err) _, err = f.Write([]byte(contents)) require.NoError(t, err) diff --git a/vfs/vfstest/open_files.go b/vfs/vfstest/open_files.go index 304b643abf..0377766e21 100644 --- a/vfs/vfstest/open_files.go +++ b/vfs/vfstest/open_files.go @@ -47,8 +47,8 @@ func (fs *openFilesFS) dumpStacks(w io.Writer) { } } -func (fs *openFilesFS) Create(name string) (vfs.File, error) { - return fs.wrapOpenFile(fs.inner.Create(name)) +func (fs *openFilesFS) Create(name string, category vfs.DiskWriteCategory) (vfs.File, error) { + return fs.wrapOpenFile(fs.inner.Create(name, category)) } func (fs *openFilesFS) Link(oldname, newname string) error { @@ -59,8 +59,10 @@ func (fs *openFilesFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, erro return fs.wrapOpenFile(fs.inner.Open(name, opts...)) } -func (fs *openFilesFS) OpenReadWrite(name string, opts ...vfs.OpenOption) (vfs.File, error) { - return fs.wrapOpenFile(fs.inner.OpenReadWrite(name, opts...)) +func (fs *openFilesFS) OpenReadWrite( + name string, category vfs.DiskWriteCategory, opts ...vfs.OpenOption, +) (vfs.File, error) { + return fs.wrapOpenFile(fs.inner.OpenReadWrite(name, category, opts...)) } func (fs *openFilesFS) OpenDir(name string) (vfs.File, error) { @@ -79,8 +81,10 @@ func (fs *openFilesFS) Rename(oldname, newname string) error { return fs.inner.Rename(oldname, newname) } -func (fs *openFilesFS) ReuseForWrite(oldname, newname string) (vfs.File, error) { - return fs.wrapOpenFile(fs.inner.ReuseForWrite(oldname, newname)) +func (fs *openFilesFS) ReuseForWrite( + oldname, newname string, category vfs.DiskWriteCategory, +) (vfs.File, error) { + return fs.wrapOpenFile(fs.inner.ReuseForWrite(oldname, newname, category)) } func (fs *openFilesFS) MkdirAll(dir string, perm os.FileMode) error { diff --git a/vfs/vfstest/open_files_test.go b/vfs/vfstest/open_files_test.go index 45795f69b6..a9d0d99e50 100644 --- a/vfs/vfstest/open_files_test.go +++ b/vfs/vfstest/open_files_test.go @@ -29,7 +29,7 @@ func TestOpenFiles(t *testing.T) { return f }}, {name: "Create", fn: func() vfs.File { - f, err := fs.Create("foo") + f, err := fs.Create("foo", vfs.WriteCategoryUnspecified) require.NoError(t, err) return f }}, @@ -44,7 +44,7 @@ func TestOpenFiles(t *testing.T) { return f }}, {name: "ReuseForWrite", fn: func() vfs.File { - f, err := fs.ReuseForWrite("foo", "bar") + f, err := fs.ReuseForWrite("foo", "bar", vfs.WriteCategoryUnspecified) require.NoError(t, err) return f }}, diff --git a/wal/failover_manager.go b/wal/failover_manager.go index 6463795f71..76a535fe83 100644 --- a/wal/failover_manager.go +++ b/wal/failover_manager.go @@ -106,7 +106,7 @@ func (p *dirProber) probeLoop() { // Delete, create, write, sync. start := p.timeSource.now() _ = p.fs.Remove(p.filename) - f, err := p.fs.Create(p.filename) + f, err := p.fs.Create(p.filename, "pebble-wal") if err != nil { return failedProbeDuration } @@ -480,7 +480,7 @@ func (wm *failoverManager) init(o Options, initial Logs) error { // proceed. An operator doesn't want to encounter an issue writing to the // secondary the first time there's a need to failover. We write a bit of // metadata to a file in the secondary's directory. - f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source")) + f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal") if err != nil { return errors.Newf("failed to write to WAL secondary dir: %v", err) } @@ -755,7 +755,7 @@ func (wm *failoverManager) logCreator( createInfo.RecycledFileNum = recycleLog.FileNum recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0)) r.writeStart() - logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename) + logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal") r.writeEnd(err) // TODO(sumeer): should we fatal since primary dir? At some point it is // better to fatal instead of continuing to failover. @@ -789,7 +789,7 @@ func (wm *failoverManager) logCreator( // // Create file. r.writeStart() - logFile, err = dir.FS.Create(logFilename) + logFile, err = dir.FS.Create(logFilename, "pebble-wal") r.writeEnd(err) return logFile, 0, err } diff --git a/wal/failover_writer.go b/wal/failover_writer.go index 21132349cf..a1579c1e45 100644 --- a/wal/failover_writer.go +++ b/wal/failover_writer.go @@ -445,7 +445,7 @@ func simpleLogCreator( filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li)) // Create file. r.writeStart() - f, err = dir.FS.Create(filename) + f, err = dir.FS.Create(filename, "pebble-wal") r.writeEnd(err) return f, 0, err } diff --git a/wal/failover_writer_test.go b/wal/failover_writer_test.go index 9fb39bd0e4..edad6a57b4 100644 --- a/wal/failover_writer_test.go +++ b/wal/failover_writer_test.go @@ -539,10 +539,10 @@ func (fs *blockingFS) maybeBlock(baseFilename string, op blockingConf) { } } -func (fs *blockingFS) Create(name string) (vfs.File, error) { +func (fs *blockingFS) Create(name string, category vfs.DiskWriteCategory) (vfs.File, error) { baseFilename := fs.FS.PathBase(name) fs.maybeBlock(baseFilename, blockingCreate) - f, err := fs.FS.Create(name) + f, err := fs.FS.Create(name, category) if err != nil { return nil, err } diff --git a/wal/reader_test.go b/wal/reader_test.go index 9b40380117..18f8378c09 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -72,7 +72,7 @@ func TestList(t *testing.T) { fs := getFS(fsName) require.NoError(t, fs.MkdirAll(fs.PathDir(filename), os.ModePerm)) - f, err := fs.Create(filename) + f, err := fs.Create(filename, vfs.WriteCategoryUnspecified) require.NoError(t, err) require.NoError(t, f.Close()) } @@ -115,11 +115,11 @@ func TestReader(t *testing.T) { var f vfs.File var err error if recycleFilename != "" { - f, err = fs.ReuseForWrite(recycleFilename, filename) + f, err = fs.ReuseForWrite(recycleFilename, filename, vfs.WriteCategoryUnspecified) require.NoError(t, err) fmt.Fprintf(&buf, "recycled %q as %q\n", recycleFilename, filename) } else { - f, err = fs.Create(filename) + f, err = fs.Create(filename, vfs.WriteCategoryUnspecified) require.NoError(t, err) fmt.Fprintf(&buf, "created %q\n", filename) } diff --git a/wal/standalone_manager.go b/wal/standalone_manager.go index eab02a4d68..ee480a7112 100644 --- a/wal/standalone_manager.go +++ b/wal/standalone_manager.go @@ -140,10 +140,10 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { recycleLog, recycleOK = m.recycler.Peek() if recycleOK { recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0)) - newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName) + newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal") base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err) } else { - newLogFile, err = m.o.Primary.FS.Create(newLogName) + newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal") base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err) } createInfo := CreateInfo{