From 0924c8cfd6f3af7c360e22bcfbafae132fd22269 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 18 Jan 2021 11:17:56 +0100 Subject: [PATCH 1/2] tracing: link child into parent, even if not verbose Prior to this change, when a child was derived from a local parent, we would not register the child with the parent. In effect, this meant that any payloads in the child would not be collected. Release note: None --- pkg/util/tracing/crdbspan.go | 10 +++++++--- pkg/util/tracing/span_test.go | 16 ++++++++++++++++ pkg/util/tracing/tracer.go | 12 ++++++------ 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index b6a0278d6e5e..0ab84dc0ff2a 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -98,12 +98,16 @@ func (s *crdbSpan) recordingType() RecordingType { // If separate recording is specified, the child is not registered with the // parent. Thus, the parent's recording will not include this child. func (s *crdbSpan) enableRecording(parent *crdbSpan, recType RecordingType) { - s.mu.Lock() - defer s.mu.Unlock() - s.mu.recording.recordingType.swap(recType) if parent != nil { parent.addChild(s) } + if recType == RecordingOff { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + s.mu.recording.recordingType.swap(recType) if recType == RecordingVerbose { s.setBaggageItemLocked(verboseTracingBaggageKey, "1") } diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index 1c99e84b0a9a..1e1e72104165 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -196,3 +196,19 @@ func TestSpan_LogStructured(t *testing.T) { require.NoError(t, types.UnmarshalAny(item, &d1)) require.IsType(t, (*types.Int32Value)(nil), d1.Message) } + +func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) { + tr := NewTracer() + tr._mode = int32(modeBackground) + sp := tr.StartSpan("root", WithForceRealSpan()) + defer sp.Finish() + ch := tr.StartSpan("child", WithParentAndAutoCollection(sp), WithForceRealSpan()) + defer ch.Finish() + require.Len(t, sp.crdb.mu.recording.children, 1) + require.Equal(t, ch.crdb, sp.crdb.mu.recording.children[0]) + ch.LogStructured(&types.Int32Value{Value: 5}) + // Check that the child span (incl its payload) is in the recording. + rec := sp.GetRecording() + require.Len(t, rec, 2) + require.Len(t, rec[1].InternalStructured, 1) +} diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 6782151126ce..1ca23b8c251e 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -394,16 +394,16 @@ func (t *Tracer) startSpanGeneric( s := &helper.span - // Start recording if necessary. We inherit the recording type of the local parent, if any, - // over the remote parent, if any. If neither are specified, we're not recording. - recordingType := opts.recordingType() - - if recordingType != RecordingOff { + { + // Link the newly created span to the parent, if necessary, + // and start recording, if requested. + // We inherit the recording type of the local parent, if any, + // over the remote parent, if any. If neither are specified, we're not recording. var p *crdbSpan if opts.Parent != nil { p = opts.Parent.crdb } - s.crdb.enableRecording(p, recordingType) + s.crdb.enableRecording(p, opts.recordingType()) } // Set initial tags. These will propagate to the crdbSpan, ot, and netTr From 16638bcc51c77a96f7bd496a82f85e1d75ff9c1a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 19 Jan 2021 10:38:02 +0100 Subject: [PATCH 2/2] kvserver: avoid engine bypass in sideload storage While investigating #59126, I realized we were still going through the engine for listing the files. I rectified this and as a result, completely removed the in-mem implementation of the sideloaded storage that we had in place: instead of it, we simply use an in-mem engine, which now works like a charm thanks to not bypassing the engine for listing the file contents (`filepath.Glob` --> `eng.List`). While I was here, I added some defense in depth: it was unreasonable that `TruncateTo` was returning an error as a result of failing to perform best-effort cleanup on its directory. I am no longer seeing the spurious errors observed in #59126 with this change. This makes sense, as in this test the sideloaded storage is set up with an in-memory engine. The old code, which read from actual disk, would try to glob `auxiliary/...` (i.e. some path relative to the cwd - oops), which would return no results. As a result, the sideloaded storage would try to invoke `eng.Remove("auxiliary/...")` which the engine would refuse, since there would actually be (in-mem) files stored in the engine. This all resolves now that everyone is looking at the same files. It's possible that this fixes the issue with BenchmarkUserFile, as previously errors from TruncateTo could block the raft log queue, which in turn could at least slow down the test, but it's hard to say for sure. Touches #59126. Touches #31913. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 - pkg/kv/kvserver/replica_sideload_disk.go | 20 +- pkg/kv/kvserver/replica_sideload_inmem.go | 112 -------- pkg/kv/kvserver/replica_sideload_test.go | 240 +++++++----------- .../replica_sst_snapshot_storage_test.go | 4 +- 5 files changed, 103 insertions(+), 274 deletions(-) delete mode 100644 pkg/kv/kvserver/replica_sideload_inmem.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 88e1f9449466..566a29f5a1b5 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -63,7 +63,6 @@ go_library( "replica_send.go", "replica_sideload.go", "replica_sideload_disk.go", - "replica_sideload_inmem.go", "replica_split_load.go", "replica_sst_snapshot_storage.go", "replica_stats.go", diff --git a/pkg/kv/kvserver/replica_sideload_disk.go b/pkg/kv/kvserver/replica_sideload_disk.go index eb9736e13ff4..3619bc13a4ae 100644 --- a/pkg/kv/kvserver/replica_sideload_disk.go +++ b/pkg/kv/kvserver/replica_sideload_disk.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" "golang.org/x/time/rate" @@ -238,7 +239,8 @@ func (ss *diskSideloadStorage) TruncateTo( // Not worth trying to figure out which one, just try to delete. err := ss.eng.RemoveDir(ss.dir) if err != nil && !oserror.IsNotExist(err) { - return bytesFreed, 0, errors.Wrapf(err, "while purging %q", ss.dir) + log.Infof(ctx, "unable to remove sideloaded dir %s: %v", ss.dir, err) + err = nil // handled } } return bytesFreed, bytesRetained, nil @@ -247,12 +249,21 @@ func (ss *diskSideloadStorage) TruncateTo( func (ss *diskSideloadStorage) forEach( ctx context.Context, visit func(index uint64, filename string) error, ) error { - matches, err := filepath.Glob(filepath.Join(ss.dir, "i*.t*")) + matches, err := ss.eng.List(ss.dir) + if oserror.IsNotExist(err) { + // Nothing to do. + return nil + } if err != nil { return err } for _, match := range matches { + // List returns a relative path, but we want to deal in absolute paths + // because we may pass this back to `eng.{Delete,Stat}`, etc, and those + // expect absolute paths. + match = filepath.Join(ss.dir, match) base := filepath.Base(match) + // Extract `i` prefix from file. if len(base) < 1 || base[0] != 'i' { continue } @@ -260,10 +271,11 @@ func (ss *diskSideloadStorage) forEach( upToDot := strings.SplitN(base, ".", 2) logIdx, err := strconv.ParseUint(upToDot[0], 10, 64) if err != nil { - return errors.Wrapf(err, "while parsing %q during TruncateTo", match) + log.Infof(ctx, "unexpected file %s in sideloaded directory %s", match, ss.dir) + continue } if err := visit(logIdx, match); err != nil { - return errors.Wrapf(err, "matching pattern %q", match) + return errors.Wrapf(err, "matching pattern %q on dir %s", match, ss.dir) } } return nil diff --git a/pkg/kv/kvserver/replica_sideload_inmem.go b/pkg/kv/kvserver/replica_sideload_inmem.go deleted file mode 100644 index 55e550292070..000000000000 --- a/pkg/kv/kvserver/replica_sideload_inmem.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver - -import ( - "context" - "fmt" - "path/filepath" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage" -) - -type slKey struct { - index, term uint64 -} - -type inMemSideloadStorage struct { - m map[slKey][]byte - prefix string -} - -func mustNewInMemSideloadStorage( - rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, baseDir string, -) SideloadStorage { - ss, err := newInMemSideloadStorage(cluster.MakeTestingClusterSettings(), rangeID, replicaID, baseDir, nil) - if err != nil { - panic(err) - } - return ss -} - -func newInMemSideloadStorage( - _ *cluster.Settings, - rangeID roachpb.RangeID, - replicaID roachpb.ReplicaID, - baseDir string, - eng storage.Engine, -) (SideloadStorage, error) { - return &inMemSideloadStorage{ - prefix: filepath.Join(baseDir, fmt.Sprintf("%d.%d", rangeID, replicaID)), - m: make(map[slKey][]byte), - }, nil -} - -func (ss *inMemSideloadStorage) key(index, term uint64) slKey { - return slKey{index: index, term: term} -} - -func (ss *inMemSideloadStorage) Dir() string { - // We could return ss.prefix but real code calling this would then take the - // result in look for it on the actual file system. - panic("unsupported") -} - -func (ss *inMemSideloadStorage) Put(_ context.Context, index, term uint64, contents []byte) error { - key := ss.key(index, term) - ss.m[key] = contents - return nil -} - -func (ss *inMemSideloadStorage) Get(_ context.Context, index, term uint64) ([]byte, error) { - key := ss.key(index, term) - data, ok := ss.m[key] - if !ok { - return nil, errSideloadedFileNotFound - } - return data, nil -} - -func (ss *inMemSideloadStorage) Filename(_ context.Context, index, term uint64) (string, error) { - return filepath.Join(ss.prefix, fmt.Sprintf("i%d.t%d", index, term)), nil -} - -func (ss *inMemSideloadStorage) Purge(_ context.Context, index, term uint64) (int64, error) { - k := ss.key(index, term) - if _, ok := ss.m[k]; !ok { - return 0, errSideloadedFileNotFound - } - size := int64(len(ss.m[k])) - delete(ss.m, k) - return size, nil -} - -func (ss *inMemSideloadStorage) Clear(_ context.Context) error { - ss.m = make(map[slKey][]byte) - return nil -} - -func (ss *inMemSideloadStorage) TruncateTo( - _ context.Context, index uint64, -) (freed, retained int64, _ error) { - // Not efficient, but this storage is for testing purposes only anyway. - for k, v := range ss.m { - if k.index < index { - freed += int64(len(v)) - delete(ss.m, k) - } else { - retained += int64(len(v)) - } - } - return freed, retained, nil -} diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 7cef0a202e2f..fddb8c1c0d8b 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -34,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -43,8 +42,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" - "github.com/cockroachdb/pebble" "github.com/kr/pretty" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3/raftpb" "golang.org/x/time/rate" ) @@ -88,45 +87,43 @@ func TestSideloadingSideloadedStorage(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) t.Run("Mem", func(t *testing.T) { - testSideloadingSideloadedStorage(t, newInMemSideloadStorage) + eng := storage.NewDefaultInMem() + defer eng.Close() + testSideloadingSideloadedStorage(t, eng) }) t.Run("Disk", func(t *testing.T) { - maker := func( - s *cluster.Settings, rangeID roachpb.RangeID, rep roachpb.ReplicaID, name string, eng storage.Engine, - ) (SideloadStorage, error) { - return newDiskSideloadStorage(s, rangeID, rep, name, rate.NewLimiter(rate.Inf, math.MaxInt64), eng) - } - testSideloadingSideloadedStorage(t, maker) + cleanup, eng := newOnDiskEngine(t) + defer cleanup() + defer eng.Close() + testSideloadingSideloadedStorage(t, eng) }) } -func testSideloadingSideloadedStorage( - t *testing.T, - maker func(*cluster.Settings, roachpb.RangeID, roachpb.ReplicaID, string, storage.Engine) (SideloadStorage, error), -) { - dir, cleanup := testutils.TempDir(t) - defer cleanup() - - ctx := context.Background() +func newTestingSideloadStorage(t *testing.T, eng storage.Engine) *diskSideloadStorage { st := cluster.MakeTestingClusterSettings() + ss, err := newDiskSideloadStorage( + st, 1, 2, filepath.Join(eng.GetAuxiliaryDir(), "fake", "testing", "dir"), + rate.NewLimiter(rate.Inf, math.MaxInt64), eng, + ) + require.NoError(t, err) + return ss +} - cleanup, eng := newEngine(t) - defer cleanup() - defer eng.Close() - - ss, err := maker(st, 1, 2, dir, eng) - if err != nil { - t.Fatal(err) - } - _, isInMem := ss.(*inMemSideloadStorage) // some things don't make sense for inMem +func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) { + ctx := context.Background() + ss := newTestingSideloadStorage(t, eng) assertCreated := func(isCreated bool) { - if isInMem { - return - } - if is := ss.(*diskSideloadStorage).dirCreated; is != isCreated { + t.Helper() + if is := ss.dirCreated; is != isCreated { t.Fatalf("assertion failed: expected dirCreated=%t, got %t", isCreated, is) } + _, err := ss.eng.Stat(ss.dir) + if !ss.dirCreated { + require.True(t, oserror.IsNotExist(err), "%v", err) + } else { + require.NoError(t, err) + } } assertCreated(false) @@ -177,7 +174,7 @@ func testSideloadingSideloadedStorage( { err: errSideloadedFileNotFound, fun: func() error { - _, err = ss.Get(ctx, 123, 456) + _, err := ss.Get(ctx, 123, 456) return err }, }, @@ -198,7 +195,7 @@ func testSideloadingSideloadedStorage( { err: nil, fun: func() error { - _, err = ss.Filename(ctx, 123, 456) + _, err := ss.Filename(ctx, 123, 456) return err }, }, @@ -232,17 +229,6 @@ func testSideloadingSideloadedStorage( } } - // Just for fun, recreate the original storage (unless it's the in-memory - // one), which shouldn't change anything about its state. - if !isInMem { - var err error - ss, err = maker(st, 1, 2, dir, eng) - if err != nil { - t.Fatal(err) - } - assertCreated(false) - } - // Just a sanity check that for the overlapping terms, we see both entries. for _, term := range []uint64{lowTerm, highTerm} { index := payloads[0] // exists at both lowTerm and highTerm @@ -252,7 +238,7 @@ func testSideloadingSideloadedStorage( t.Fatalf("got %q, wanted %q", c, exp) } } - assertCreated(false) // Get() doesn't recreated nor check + assertCreated(true) for n := range payloads { // Truncate indexes <= payloads[n] (payloads is sorted in increasing order). @@ -277,81 +263,58 @@ func testSideloadingSideloadedStorage( } func() { - if isInMem { - return - } // First add a file that shouldn't be in the sideloaded storage to ensure // sane behavior when directory can't be removed after full truncate. - nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx") + nonRemovableFile := filepath.Join(ss.Dir(), "cantremove.xx") f, err := eng.Create(nonRemovableFile) if err != nil { t.Fatalf("could not create non i*.t* file in sideloaded storage: %+v", err) } - defer f.Close() + // We have to close the file right away, otherwise (at least with in-mem pebble) + // we will be prevented from removing it below. + require.NoError(t, f.Close()) _, _, err = ss.TruncateTo(ctx, math.MaxUint64) - if err == nil { - t.Fatalf("sideloaded directory should not have been removable due to extra file %s", nonRemovableFile) - } - if !strings.HasSuffix(strings.ToLower(err.Error()), "directory not empty") { - t.Fatalf("error truncating sideloaded storage: %+v", err) + // The sideloaded storage should not error out here; removing files + // is optional. But the file should still be there! + require.NoError(t, err) + { + _, err := eng.Stat(nonRemovableFile) + require.NoError(t, err) } + // Now remove extra file and let truncation proceed to remove directory. - err = eng.Remove(nonRemovableFile) - if err != nil { - t.Fatalf("could not remove %s: %+v", nonRemovableFile, err) - } + require.NoError(t, eng.Remove(nonRemovableFile)) // Test that directory is removed when filepath.Glob returns 0 matches. - if _, _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { - t.Fatal(err) - } + _, _, err = ss.TruncateTo(ctx, math.MaxUint64) + require.NoError(t, err) // Ensure directory is removed, now that all files should be gone. - _, err = eng.Stat(ss.(*diskSideloadStorage).dir) - if err == nil { - t.Fatalf("expected %q to be removed after truncating full range", ss.(*diskSideloadStorage).dir) - } - if err != nil { - if !oserror.IsNotExist(err) { - t.Fatalf("expected %q to be removed: %+v", ss.(*diskSideloadStorage).dir, err) - } - } + _, err = eng.Stat(ss.Dir()) + require.True(t, oserror.IsNotExist(err), "%v", err) // Repopulate with some random indexes to test deletion when there are a // non-zero number of filepath.Glob matches. payloads := []uint64{3, 5, 7, 9, 10} for n := range rand.Perm(len(payloads)) { i := payloads[n] - if err := ss.Put(ctx, i, highTerm, file(i*highTerm)); err != nil { - t.Fatalf("%d: %+v", i, err) - } + require.NoError(t, ss.Put(ctx, i, highTerm, file(i*highTerm))) } assertCreated(true) - if _, _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { - t.Fatal(err) - } + _, _, err = ss.TruncateTo(ctx, math.MaxUint64) + require.NoError(t, err) // Ensure directory is removed when all records are removed. - _, err = eng.Stat(ss.(*diskSideloadStorage).dir) - if err == nil { - t.Fatalf("expected %q to be removed after truncating full range", ss.(*diskSideloadStorage).dir) - } - if err != nil { - if !oserror.IsNotExist(err) { - t.Fatalf("expected %q to be removed: %+v", ss.(*diskSideloadStorage).dir, err) - } - } + _, err = eng.Stat(ss.Dir()) + require.True(t, oserror.IsNotExist(err), "%v", err) }() - if err := ss.Clear(ctx); err != nil { - t.Fatal(err) - } + require.NoError(t, ss.Clear(ctx)) assertCreated(false) // Sanity check that we can call TruncateTo without the directory existing. - if _, _, err := ss.TruncateTo(ctx, 1); err != nil { - t.Fatal(err) - } + _, _, err := ss.TruncateTo(ctx, 1) + require.NoError(t, err) assertCreated(false) @@ -362,9 +325,7 @@ func testSideloadingSideloadedStorage( continue } payload := []byte(strings.Repeat("x", 1+int(index))) - if err := ss.Put(ctx, index, 10, payload); err != nil { - t.Fatalf("%d: %+v", index, err) - } + require.NoError(t, ss.Put(ctx, index, 10, payload)) } // Term too high and too low, respectively. Shouldn't delete anything. @@ -457,8 +418,10 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording") defer cancel() + eng := storage.NewDefaultInMem() + defer eng.Close() + ss := newTestingSideloadStorage(t, eng) ec := raftentry.NewCache(1024) // large enough - ss := mustNewInMemSideloadStorage(rangeID, roachpb.ReplicaID(1), ".") if test.setup != nil { test.setup(ec, ss) } @@ -545,14 +508,14 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { name: "v2", preEnts: []raftpb.Entry{entV2SST, entV2Reg}, postEnts: []raftpb.Entry{entV2SSTStripped, entV2Reg}, - ss: []string{"i13t99"}, + ss: []string{"i13.t99"}, size: int64(len(addSST.Data)), }, { name: "mixed", preEnts: []raftpb.Entry{entV1Reg, entV1SST, entV2Reg, entV2SST}, postEnts: []raftpb.Entry{entV1Reg, entV1SST, entV2Reg, entV2SSTStripped}, - ss: []string{"i13t99"}, + ss: []string{"i13.t99"}, size: int64(len(addSST.Data)), }, } @@ -560,7 +523,9 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { ctx := context.Background() - sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(3), roachpb.ReplicaID(17), ".") + eng := storage.NewDefaultInMem() + defer eng.Close() + sideloaded := newTestingSideloadStorage(t, eng) postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded) if err != nil { t.Fatal(err) @@ -574,10 +539,12 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { if test.size != size { t.Fatalf("expected %d sideloadedSize, but found %d", test.size, size) } - var actKeys []string - for k := range sideloaded.(*inMemSideloadStorage).m { - actKeys = append(actKeys, fmt.Sprintf("i%dt%d", k.index, k.term)) + actKeys, err := sideloaded.eng.List(sideloaded.Dir()) + if oserror.IsNotExist(err) { + t.Log("swallowing IsNotExist") + err = nil } + require.NoError(t, err) sort.Strings(actKeys) if !reflect.DeepEqual(actKeys, test.ss) { t.Fatalf("expected %v, got %v", test.ss, actKeys) @@ -586,54 +553,32 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { } } -func makeInMemSideloaded(repl *Replica) { - repl.raftMu.Lock() - repl.raftMu.sideloaded = mustNewInMemSideloadStorage(repl.RangeID, 0, repl.store.engine.GetAuxiliaryDir()) - repl.raftMu.Unlock() -} - // TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. func TestRaftSSTableSideloadingProposal(t *testing.T) { defer leaktest.AfterTest(t)() - testutils.RunTrueAndFalse(t, "engineInMem", func(t *testing.T, engineInMem bool) { - testutils.RunTrueAndFalse(t, "mockSideloaded", func(t *testing.T, mockSideloaded bool) { - if engineInMem && !mockSideloaded { - skip.WithIssue(t, 31913) - } - testRaftSSTableSideloadingProposal(t, engineInMem, mockSideloaded) - }) + testutils.RunTrueAndFalse(t, "InMem", func(t *testing.T, engineInMem bool) { + var eng storage.Engine + if engineInMem { + eng = storage.NewDefaultInMem() + } else { + var cleanup func() + cleanup, eng = newOnDiskEngine(t) + defer cleanup() + } + defer eng.Close() + testRaftSSTableSideloadingProposal(t, eng) }) } // TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. -func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloaded bool) { +func testRaftSSTableSideloadingProposal(t *testing.T, eng storage.Engine) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) defer SetMockAddSSTable()() - dir, cleanup := testutils.TempDir(t) - defer cleanup() stopper := stop.NewStopper() tc := testContext{} - if !engineInMem { - cfg := storage.PebbleConfig{ - StorageConfig: base.StorageConfig{ - Dir: dir, - Settings: cluster.MakeTestingClusterSettings(), - }, - } - cfg.Opts = storage.DefaultPebbleOptions() - var err error - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - cfg.Opts.Cache = cache - tc.engine, err = storage.NewPebble(context.Background(), cfg) - if err != nil { - t.Fatal(err) - } - stopper.AddCloser(tc.engine) - } defer stopper.Stop(context.Background()) tc.Start(t, stopper) @@ -646,10 +591,6 @@ func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloade ) val := strings.Repeat("x", entrySize) - if mockSideloaded { - makeInMemSideloaded(tc.repl) - } - ts := hlc.Timestamp{Logical: 1} if err := ProposeAddSSTable(ctx, key, val, ts, tc.store); err != nil { @@ -680,9 +621,6 @@ func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloade func() { tc.repl.raftMu.Lock() defer tc.repl.raftMu.Unlock() - if ss, ok := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); ok && len(ss.m) < 1 { - t.Fatal("sideloaded storage is empty") - } if err := testutils.MatchInOrder( collect().String(), "sideloadable proposal detected", "ingested SSTable", @@ -701,10 +639,6 @@ func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloade // depends on luck and the file system, so don't try to assert it. We should, however, see // no more than one. expMaxCopies := int64(1) - if engineInMem { - // We don't count in-memory env SST writes as copies. - expMaxCopies = 0 - } if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n > expMaxCopies { t.Fatalf("expected metric to show <= %d AddSSTable copies, but got %d", expMaxCopies, n) } @@ -759,7 +693,7 @@ func (mr *mockSender) Recv() (*SnapshotResponse, error) { return &SnapshotResponse{Status: status}, nil } -func newEngine(t *testing.T) (func(), storage.Engine) { +func newOnDiskEngine(t *testing.T) (func(), storage.Engine) { dir, cleanup := testutils.TempDir(t) eng, err := storage.NewDefaultEngine( 1<<20, @@ -783,7 +717,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { ctx := context.Background() tc := testContext{} - cleanup, eng := newEngine(t) + cleanup, eng := newOnDiskEngine(t) tc.engine = eng defer cleanup() defer eng.Close() @@ -972,7 +906,6 @@ func TestRaftSSTableSideloadingTruncation(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(context.Background()) tc.Start(t, stopper) - makeInMemSideloaded(tc.repl) ctx := context.Background() const count = 10 @@ -999,14 +932,11 @@ func TestRaftSSTableSideloadingTruncation(t *testing.T) { addLastIndex() fmtSideloaded := func() []string { - var r []string tc.repl.raftMu.Lock() defer tc.repl.raftMu.Unlock() - for k := range tc.repl.raftMu.sideloaded.(*inMemSideloadStorage).m { - r = append(r, fmt.Sprintf("%v", k)) - } - sort.Strings(r) - return r + fs, _ := tc.repl.Engine().List(tc.repl.raftMu.sideloaded.Dir()) + sort.Strings(fs) + return fs } // Check that when we truncate, the number of on-disk files changes in ways diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index 1fe92b2bcd1c..ab941343477f 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -34,7 +34,7 @@ func TestSSTSnapshotStorage(t *testing.T) { testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) testLimiter := rate.NewLimiter(rate.Inf, 0) - cleanup, eng := newEngine(t) + cleanup, eng := newOnDiskEngine(t) defer cleanup() defer eng.Close() @@ -117,7 +117,7 @@ func TestMultiSSTWriterInitSST(t *testing.T) { testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) testLimiter := rate.NewLimiter(rate.Inf, 0) - cleanup, eng := newEngine(t) + cleanup, eng := newOnDiskEngine(t) defer cleanup() defer eng.Close()