diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index c006d052ed39..1d9a9f5c872f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -19,6 +19,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -304,6 +306,11 @@ type cloudStorageSink struct { dataFilePartition string prevFilename string metrics metricsRecorder + + asyncFlushActive bool + flushCtx context.Context + flushGroup sync.WaitGroup + flushErr atomic.Value } const sinkCompressionGzip = "gzip" @@ -364,8 +371,11 @@ func makeCloudStorageSink( partitionFormat: defaultPartitionFormat, timestampOracle: timestampOracle, // TODO(dan,ajwerner): Use the jobs framework's session ID once that's available. - jobSessionID: sessID, - topicNamer: tn, + jobSessionID: sessID, + topicNamer: tn, + asyncFlushActive: enableAsyncFlush.Get(&settings.SV), + // TODO (yevgeniy): Consider adding ctx to Dial method instead. + flushCtx: ctx, } if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" { @@ -503,6 +513,13 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( if err != nil { return err } + + // Wait for previously issued async flush requests to complete + // before we write resolved time stamp file. + if err := s.waitAsyncFlush(); err != nil { + return errors.Wrapf(err, "while emitting resolved timestamp") + } + // Don't need to copy payload because we never buffer it anywhere. part := resolved.GoTime().Format(s.partitionFormat) @@ -572,29 +589,44 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { // for an overview of the naming convention and proof of correctness. s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) - return nil + return s.waitAsyncFlush() } -// file should not be used after flushing. -func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { - defer file.alloc.Release(ctx) +// enableAsyncFlush controls async flushing behavior for this sink. +var enableAsyncFlush = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.cloudstorage.async_flush.enabled", + "enable async flushing", + true, +) - if file.rawSize == 0 { - // This method shouldn't be called with an empty file, but be defensive - // about not writing empty files anyway. - return nil +// waitAsyncFlush waits until all async flushes complete. +func (s *cloudStorageSink) waitAsyncFlush() error { + s.flushGroup.Wait() + if v := s.flushErr.Load(); v != nil { + return v.(error) } + return nil +} - if file.codec != nil { - if err := file.codec.Close(); err != nil { +// flushFile flushes file to the cloud storage. +// file should not be used after flushing. +func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { + asyncFlushEnabled := enableAsyncFlush.Get(&s.settings.SV) + if s.asyncFlushActive && !asyncFlushEnabled { + // Async flush behavior was turned off -- drain any active flush requests + // before flushing this file. + if err := s.waitAsyncFlush(); err != nil { return err } } + s.asyncFlushActive = asyncFlushEnabled // We use this monotonically increasing fileID to ensure correct ordering // among files emitted at the same timestamp during the same job session. fileID := s.fileID s.fileID++ + // Pad file ID to maintain lexical ordering among files from the same sink. // Note that we use `-` here to delimit the filename because we want // `%d.RESOLVED` files to lexicographically succeed data files that have the @@ -606,11 +638,52 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink "precedes a file emitted before: %s", filename, s.prevFilename) } s.prevFilename = filename - compressedBytes := file.buf.Len() - if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil { + dest := filepath.Join(s.dataFilePartition, filename) + + if !asyncFlushEnabled { + return file.flushToStorage(ctx, s.es, dest, s.metrics) + } + + s.flushGroup.Add(1) + go func() { + defer s.flushGroup.Done() + // NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled). + if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil { + log.Errorf(ctx, "error flushing file to storage: %s", err) + // We must use the same type for error we store in flushErr. + s.flushErr.CompareAndSwap(nil, &flushError{error: err}) + } + }() + return nil +} + +type flushError struct { + error +} + +// flushToStorage writes out file into external storage into 'dest'. +func (f *cloudStorageSinkFile) flushToStorage( + ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder, +) error { + defer f.alloc.Release(ctx) + + if f.rawSize == 0 { + // This method shouldn't be called with an empty file, but be defensive + // about not writing empty files anyway. + return nil + } + + if f.codec != nil { + if err := f.codec.Close(); err != nil { + return err + } + } + + compressedBytes := f.buf.Len() + if err := cloud.WriteFile(ctx, es, dest, bytes.NewReader(f.buf.Bytes())); err != nil { return err } - s.metrics.recordEmittedBatch(file.created, file.numMessages, file.oldestMVCC, file.rawSize, compressedBytes) + m.recordEmittedBatch(f.created, f.numMessages, f.oldestMVCC, f.rawSize, compressedBytes) return nil } @@ -618,7 +691,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink // Close implements the Sink interface. func (s *cloudStorageSink) Close() error { s.files = nil - return s.es.Close() + return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close()) } // Dial implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 58ceb9b93081..dbad01d047c4 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -69,7 +69,7 @@ func TestCloudStorageSink(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - dir, dirCleanupFn := testutils.TempDir(t) + externalIODir, dirCleanupFn := testutils.TempDir(t) defer dirCleanupFn() gzipDecompress := func(t *testing.T, compressed []byte) []byte { @@ -88,8 +88,12 @@ func TestCloudStorageSink(t *testing.T) { return decompressed } - listLeafDirectories := func(root string) []string { - absRoot := filepath.Join(dir, root) + testDir := func(t *testing.T) string { + return strings.ReplaceAll(t.Name(), "/", ";") + } + + listLeafDirectories := func(t *testing.T) []string { + absRoot := filepath.Join(externalIODir, testDir(t)) var folders []string @@ -126,7 +130,7 @@ func TestCloudStorageSink(t *testing.T) { // slurpDir returns the contents of every file under root (relative to the // temp dir created above), sorted by the name of the file. - slurpDir := func(t *testing.T, root string) []string { + slurpDir := func(t *testing.T) []string { var files []string walkFn := func(path string, info os.FileInfo, err error) error { if err != nil { @@ -145,7 +149,7 @@ func TestCloudStorageSink(t *testing.T) { files = append(files, string(file)) return nil } - absRoot := filepath.Join(dir, root) + absRoot := filepath.Join(externalIODir, testDir(t)) require.NoError(t, os.MkdirAll(absRoot, 0755)) require.NoError(t, filepath.Walk(absRoot, walkFn)) return files @@ -154,7 +158,7 @@ func TestCloudStorageSink(t *testing.T) { const unlimitedFileSize int64 = math.MaxInt64 var noKey []byte settings := cluster.MakeTestingClusterSettings() - settings.ExternalIODir = dir + settings.ExternalIODir = externalIODir opts := changefeedbase.EncodingOptions{ Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeWrapped, @@ -180,25 +184,33 @@ func TestCloudStorageSink(t *testing.T) { user := username.RootUserName() - sinkURI := func(dir string, maxFileSize int64) sinkURL { - uri := `nodelocal://0/` + dir + sinkURI := func(t *testing.T, maxFileSize int64) sinkURL { + u, err := url.Parse(fmt.Sprintf("nodelocal://0/%s", testDir(t))) + require.NoError(t, err) + sink := sinkURL{URL: u} if maxFileSize != unlimitedFileSize { - uri += fmt.Sprintf("?%s=%d", changefeedbase.SinkParamFileSize, maxFileSize) + sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) } - u, err := url.Parse(uri) - require.NoError(t, err) - return sinkURL{URL: u} + return sink } - t.Run(`golden`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { + t.Helper() + testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { + enableAsyncFlush.Override(context.Background(), &settings.SV, enable) + testFn(t) + }) + } + + testWithAndWithoutAsyncFlushing(t, `golden`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - sinkDir := `golden` + s, err := makeCloudStorageSink( - ctx, sinkURI(sinkDir, unlimitedFileSize), 1, settings, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -210,11 +222,11 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string{ "v1\n", - }, slurpDir(t, sinkDir)) + }, slurpDir(t)) require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(5))) resolvedFile, err := os.ReadFile(filepath.Join( - dir, sinkDir, `1970-01-01`, `197001010000000000000050000000000.RESOLVED`)) + externalIODir, testDir(t), `1970-01-01`, `197001010000000000000050000000000.RESOLVED`)) require.NoError(t, err) require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile)) }) @@ -225,7 +237,14 @@ func TestCloudStorageSink(t *testing.T) { return forwarded } - t.Run(`single-node`, func(t *testing.T) { + stringOrDefault := func(s, ifEmpty string) string { + if len(s) == 0 { + return ifEmpty + } + return s + } + + testWithAndWithoutAsyncFlushing(t, `single-node`, func(t *testing.T) { before := opts.Compression // Compression codecs include buffering that interferes with other tests, // e.g. the bucketing test that configures very small flush sizes. @@ -234,7 +253,7 @@ func TestCloudStorageSink(t *testing.T) { }() for _, compression := range []string{"", "gzip"} { opts.Compression = compression - t.Run("compress="+compression, func(t *testing.T) { + t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) { t1 := makeTopic(`t1`) t2 := makeTopic(`t2`) @@ -242,9 +261,8 @@ func TestCloudStorageSink(t *testing.T) { sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `single-node` + compression s, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, settings, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -253,7 +271,7 @@ func TestCloudStorageSink(t *testing.T) { // Empty flush emits no files. require.NoError(t, s.Flush(ctx)) - require.Equal(t, []string(nil), slurpDir(t, dir)) + require.Equal(t, []string(nil), slurpDir(t)) // Emitting rows and flushing should write them out in one file per table. Note // the ordering among these two files is non deterministic as either of them could @@ -268,19 +286,19 @@ func TestCloudStorageSink(t *testing.T) { "v1\nv2\n", "w1\n", } - actual := slurpDir(t, dir) + actual := slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) // Flushing with no new emits writes nothing new. require.NoError(t, s.Flush(ctx)) - actual = slurpDir(t, dir) + actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) // Without a flush, nothing new shows up. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc)) - actual = slurpDir(t, dir) + actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) @@ -290,7 +308,7 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v3\n", - }, slurpDir(t, dir)[2:]) + }, slurpDir(t)[2:]) // Data from different versions of a table is put in different files, so that we // can guarantee that all rows in any given file have the same schema. @@ -306,7 +324,7 @@ func TestCloudStorageSink(t *testing.T) { "v4\n", "v5\n", } - actual = slurpDir(t, dir) + actual = slurpDir(t) actual = actual[len(actual)-2:] sort.Strings(actual) require.Equal(t, expected, actual) @@ -314,22 +332,20 @@ func TestCloudStorageSink(t *testing.T) { } }) - t.Run(`multi-node`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `multi-node`, func(t *testing.T) { t1 := makeTopic(`t1`) - testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `multi-node` s1, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1.Close()) }() s2, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 2, + ctx, sinkURI(t, unlimitedFileSize), 2, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) defer func() { require.NoError(t, s2.Close()) }() @@ -353,19 +369,19 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string{ "v1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // If a node restarts then the entire distsql flow has to restart. If // this happens before checkpointing, some data is written again but // this is unavoidable. s1R, err := makeCloudStorageSink( - ctx, sinkURI(dir, unbuffered), 1, + ctx, sinkURI(t, unbuffered), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1R.Close()) }() s2R, err := makeCloudStorageSink( - ctx, sinkURI(dir, unbuffered), 2, + ctx, sinkURI(t, unbuffered), 2, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -390,7 +406,7 @@ func TestCloudStorageSink(t *testing.T) { "v1\n", "w1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) // The jobs system can't always clean up perfectly after itself and so there @@ -400,15 +416,14 @@ func TestCloudStorageSink(t *testing.T) { // // This test is also sufficient for verifying the behavior of a multi-node // changefeed using this sink. Ditto job restarts. - t.Run(`zombie`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `zombie`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `zombie` s1, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -416,7 +431,7 @@ func TestCloudStorageSink(t *testing.T) { s1.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. s1.(*cloudStorageSink).jobSessionID = "a" // Force deterministic job session ID. s2, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -441,19 +456,21 @@ func TestCloudStorageSink(t *testing.T) { "v1\nv2\n", "v3\n", "v1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`bucketing`, func(t *testing.T) { + waitAsyncFlush := func(s Sink) error { + return s.(*cloudStorageSink).waitAsyncFlush() + } + testWithAndWithoutAsyncFlushing(t, `bucketing`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `bucketing` const targetMaxFileSize = 6 s, err := makeCloudStorageSink( - ctx, sinkURI(dir, targetMaxFileSize), 1, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -465,16 +482,17 @@ func TestCloudStorageSink(t *testing.T) { for i := int64(1); i <= 5; i++ { require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) } + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\nv2\nv3\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Flush then writes the rest. require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v1\nv2\nv3\n", "v4\nv5\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Forward the SpanFrontier here and trigger an empty flush to update // the sink's `inclusiveLowerBoundTs` @@ -487,11 +505,12 @@ func TestCloudStorageSink(t *testing.T) { for i := int64(6); i < 10; i++ { require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) } + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\nv2\nv3\n", "v4\nv5\n", "v6\nv7\nv8\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Resolved timestamps are periodically written. This happens // asynchronously from a different node and can be given an earlier @@ -507,7 +526,7 @@ func TestCloudStorageSink(t *testing.T) { "v4\nv5\n", `{"resolved":"5.0000000000"}`, "v6\nv7\nv8\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Flush then writes the rest. Since we use the time of the EmitRow // or EmitResolvedTimestamp calls to order files, the resolved timestamp @@ -520,7 +539,7 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"5.0000000000"}`, "v6\nv7\nv8\n", "v9\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // A resolved timestamp emitted with ts > 5 should follow everything // emitted thus far. @@ -532,10 +551,10 @@ func TestCloudStorageSink(t *testing.T) { "v6\nv7\nv8\n", "v9\n", `{"resolved":"6.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`partition-formatting`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `partition-formatting`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} const targetMaxFileSize = 6 @@ -550,7 +569,7 @@ func TestCloudStorageSink(t *testing.T) { time.Date(2000, time.January, 2, 6, 1, 1, 0, time.UTC), } - for i, tc := range []struct { + for _, tc := range []struct { format string expectedFolders []string }{ @@ -582,51 +601,50 @@ func TestCloudStorageSink(t *testing.T) { }, }, } { - t.Run(tc.format, func(t *testing.T) { - sf, err := span.MakeFrontier(testSpan) - require.NoError(t, err) - timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + testWithAndWithoutAsyncFlushing(t, stringOrDefault(tc.format, "default"), + func(t *testing.T) { + sf, err := span.MakeFrontier(testSpan) + require.NoError(t, err) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := fmt.Sprintf(`partition-formatting-%d`, i) + sinkURIWithParam := sinkURI(t, targetMaxFileSize) + sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) + t.Logf("format=%s sinkgWithParam: %s", tc.format, sinkURIWithParam.String()) + s, err := makeCloudStorageSink( + ctx, sinkURIWithParam, 1, + settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ) - sinkURIWithParam := sinkURI(dir, targetMaxFileSize) - sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) - s, err := makeCloudStorageSink( - ctx, sinkURIWithParam, 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, - ) - - require.NoError(t, err) - defer func() { require.NoError(t, s.Close()) }() - s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + require.NoError(t, err) + defer func() { require.NoError(t, s.Close()) }() + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. - for i, timestamp := range timestamps { - hlcTime := ts(timestamp.UnixNano()) + for i, timestamp := range timestamps { + hlcTime := ts(timestamp.UnixNano()) - // Move the frontier and flush to update the dataFilePartition value - _, err = sf.Forward(testSpan, hlcTime) - require.NoError(t, err) - require.NoError(t, s.Flush(ctx)) + // Move the frontier and flush to update the dataFilePartition value + _, err = sf.Forward(testSpan, hlcTime) + require.NoError(t, err) + require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) - } + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) + } - require.NoError(t, s.Flush(ctx)) // Flush the last file - require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(dir)) - require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t, dir)) - }) + require.NoError(t, s.Flush(ctx)) // Flush the last file + require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(t)) + require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t)) + }) } }) - t.Run(`file-ordering`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `file-ordering`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `file-ordering` s, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -663,7 +681,7 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"3.0000000000"}`, "e3next\n", `{"resolved":"4.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) // Test that files with timestamp lower than the least resolved timestamp // as of file creation time are ignored. @@ -675,19 +693,18 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"3.0000000000"}`, "e3next\n", `{"resolved":"4.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`ordering-among-schema-versions`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `ordering-among-schema-versions`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `ordering-among-schema-versions` var targetMaxFileSize int64 = 10 s, err := makeCloudStorageSink( - ctx, sinkURI(dir, targetMaxFileSize), 1, settings, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -699,20 +716,22 @@ func TestCloudStorageSink(t *testing.T) { // for the first file but not the second one. t1.Version = 0 require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Now make the file with the newer schema exceed its file size threshold and ensure // that the file with the older schema is flushed (and ordered) before. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), zeroAlloc)) t1.Version = 1 require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v3`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", "v2\n", "v3\ntrigger-flush-v3\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Calling `Flush()` on the sink should emit files in the order of their schema IDs. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc)) @@ -725,6 +744,6 @@ func TestCloudStorageSink(t *testing.T) { "v3\ntrigger-flush-v3\n", "x1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) } diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 15b6ee6908f0..f8bef021b3f1 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -550,6 +550,20 @@ var metaRdbBytesIngested = storageLevelMetricMetadata( metric.Unit_BYTES, ) +var metaRdbLevelSize = storageLevelMetricMetadata( + "level-size", + "Size of the SSTables in level %d", + "Bytes", + metric.Unit_BYTES, +) + +var metaRdbLevelScores = storageLevelMetricMetadata( + "level-score", + "Compaction score of level %d", + "Score", + metric.Unit_COUNT, +) + var ( metaRdbWriteStalls = metric.Metadata{ Name: "storage.write-stalls", @@ -1708,7 +1722,9 @@ type StoreMetrics struct { RdbL0BytesFlushed *metric.Gauge RdbL0Sublevels *metric.Gauge RdbL0NumFiles *metric.Gauge - RdbBytesIngested [7]*metric.Gauge // idx = level + RdbBytesIngested [7]*metric.Gauge // idx = level + RdbLevelSize [7]*metric.Gauge // idx = level + RdbLevelScore [7]*metric.GaugeFloat64 // idx = level RdbWriteStalls *metric.Gauge RdbWriteStallNanos *metric.Gauge @@ -2137,6 +2153,8 @@ func newTenantsStorageMetrics() *TenantsStorageMetrics { func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { storeRegistry := metric.NewRegistry() rdbBytesIngested := storageLevelGaugeSlice(metaRdbBytesIngested) + rdbLevelSize := storageLevelGaugeSlice(metaRdbLevelSize) + rdbLevelScore := storageLevelGaugeFloat64Slice(metaRdbLevelScores) sm := &StoreMetrics{ registry: storeRegistry, @@ -2219,6 +2237,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RdbL0Sublevels: metric.NewGauge(metaRdbL0Sublevels), RdbL0NumFiles: metric.NewGauge(metaRdbL0NumFiles), RdbBytesIngested: rdbBytesIngested, + RdbLevelSize: rdbLevelSize, + RdbLevelScore: rdbLevelScore, RdbWriteStalls: metric.NewGauge(metaRdbWriteStalls), RdbWriteStallNanos: metric.NewGauge(metaRdbWriteStallNanos), @@ -2523,6 +2543,8 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.RdbL0BytesFlushed.Update(int64(m.Levels[0].BytesFlushed)) for level, stats := range m.Levels { sm.RdbBytesIngested[level].Update(int64(stats.BytesIngested)) + sm.RdbLevelSize[level].Update(stats.Size) + sm.RdbLevelScore[level].Update(stats.Score) } } @@ -2577,3 +2599,11 @@ func storageLevelGaugeSlice(sl [7]metric.Metadata) [7]*metric.Gauge { } return gs } + +func storageLevelGaugeFloat64Slice(sl [7]metric.Metadata) [7]*metric.GaugeFloat64 { + var gs [7]*metric.GaugeFloat64 + for i := range sl { + gs[i] = metric.NewGaugeFloat64(sl[i]) + } + return gs +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 13dbb3b1ebd5..e4ddbe13a291 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3004,6 +3004,31 @@ var charts = []sectionDescription{ Metrics: []string{"storage.write-stall-nanos"}, AxisLabel: "Duration (nanos)", }, + { + Title: "Bytes Used Per Level", + Metrics: []string{ + "storage.l0-level-size", + "storage.l1-level-size", + "storage.l2-level-size", + "storage.l3-level-size", + "storage.l4-level-size", + "storage.l5-level-size", + "storage.l6-level-size", + }, + AxisLabel: "Bytes", + }, + { + Title: "Compaction Score Per Level", + Metrics: []string{ + "storage.l0-level-score", + "storage.l1-level-score", + "storage.l2-level-score", + "storage.l3-level-score", + "storage.l4-level-score", + "storage.l5-level-score", + "storage.l6-level-score", + }, + }, }, }, { diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx index e18cd94d75df..8a4cc81da334 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx @@ -39,8 +39,8 @@ export function makeStatementInsightsColumns(): ColumnDescriptor ( {String(item.statementID)} @@ -141,13 +141,6 @@ export function makeStatementInsightsColumns(): ColumnDescriptor String(item.isFullScan), showByDefault: false, }, - { - name: "transactionID", - title: insightsTableTitles.executionID(InsightExecEnum.TRANSACTION), - cell: (item: StatementInsightEvent) => item.transactionID, - sort: (item: StatementInsightEvent) => item.transactionID, - showByDefault: false, - }, { name: "transactionFingerprintID", title: insightsTableTitles.fingerprintID(InsightExecEnum.TRANSACTION), @@ -155,6 +148,13 @@ export function makeStatementInsightsColumns(): ColumnDescriptor item.transactionFingerprintID, showByDefault: false, }, + { + name: "transactionID", + title: insightsTableTitles.latestExecutionID(InsightExecEnum.TRANSACTION), + cell: (item: StatementInsightEvent) => item.transactionID, + sort: (item: StatementInsightEvent) => item.transactionID, + showByDefault: false, + }, ]; } diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx index 7cb960cc2fa0..5a5776405746 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx @@ -32,8 +32,8 @@ export function makeTransactionInsightsColumns(): ColumnDescriptor ( {String(item.transactionID)} diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx index 230ff4c35b2b..656cf80f0de5 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx @@ -17,6 +17,7 @@ import { contentionTime, readFromDisk, writtenToDisk } from "../../../util"; export const insightsColumnLabels = { executionID: "Execution ID", + latestExecutionID: "Latest Execution ID", query: "Execution", insights: "Insights", startTime: "Start Time (UTC)", @@ -47,8 +48,14 @@ export function getLabel( ): string { switch (execType) { case InsightExecEnum.TRANSACTION: + if (key === "latestExecutionID") { + return "Latest Transaction Execution ID"; + } return "Transaction " + insightsColumnLabels[key]; case InsightExecEnum.STATEMENT: + if (key === "latestExecutionID") { + return "Latest Statement Execution ID"; + } return "Statement " + insightsColumnLabels[key]; default: return insightsColumnLabels[key]; @@ -76,12 +83,19 @@ export const insightsTableTitles: InsightsTableTitleType = { ); }, executionID: (execType: InsightExecEnum) => { + return makeToolTip( +

The ID of the execution with the {execType} fingerprint.

, + "executionID", + execType, + ); + }, + latestExecutionID: (execType: InsightExecEnum) => { return makeToolTip(

The execution ID of the latest execution with the {execType}{" "} fingerprint.

, - "executionID", + "latestExecutionID", execType, ); },