From 81f9932c4cb04e9475b77415dfaee696d6283af2 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 21 Sep 2022 16:21:35 -0400 Subject: [PATCH 1/3] changefeedccl: Do not block on file size based flushes Prior to this change, cloud storage sink trigger file sized based flush whenever new row would would push the file size beyond configured threshold. This had the effect of singificantly reducing the throughput whenever such event occured -- no additional events could be added to cloud storage sink, while the previus flush was active. This is not necessary. Cloud storage sink can trigger file based flushes asynchronously. The only requirement is that if a real, non file based, flush arrives, or if we need to emit resolved timestamps, then we must wait for all of the active flush requests to complete. In addition, because every event added to cloud sink has associate allocation, which is released when file is written out, performing flushes asynchronously is safe with respect to memory usage and accounting. Release note (enterprise change): Changefeeds, using cloud storage sink, now have better throughput. Release justification: performance fix --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 105 +++++++-- .../changefeedccl/sink_cloudstorage_test.go | 199 ++++++++++-------- 2 files changed, 198 insertions(+), 106 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 61e945ee4734..fe1d3f0b3769 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -19,6 +19,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -304,6 +306,11 @@ type cloudStorageSink struct { dataFilePartition string prevFilename string metrics metricsRecorder + + asyncFlushActive bool + flushCtx context.Context + flushGroup sync.WaitGroup + flushErr atomic.Value } const sinkCompressionGzip = "gzip" @@ -364,8 +371,11 @@ func makeCloudStorageSink( partitionFormat: defaultPartitionFormat, timestampOracle: timestampOracle, // TODO(dan,ajwerner): Use the jobs framework's session ID once that's available. - jobSessionID: sessID, - topicNamer: tn, + jobSessionID: sessID, + topicNamer: tn, + asyncFlushActive: enableAsyncFlush.Get(&settings.SV), + // TODO (yevgeniy): Consider adding ctx to Dial method instead. + flushCtx: ctx, } if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" { @@ -503,6 +513,13 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( if err != nil { return err } + + // Wait for previously issued async flush requests to complete + // before we write resolved time stamp file. + if err := s.waitAsyncFlush(); err != nil { + return errors.Wrapf(err, "while emitting resolved timestamp") + } + // Don't need to copy payload because we never buffer it anywhere. part := resolved.GoTime().Format(s.partitionFormat) @@ -572,29 +589,44 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { // for an overview of the naming convention and proof of correctness. s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) - return nil + return s.waitAsyncFlush() } -// file should not be used after flushing. -func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { - defer file.alloc.Release(ctx) +// enableAsyncFlush controls async flushing behavior for this sink. +var enableAsyncFlush = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.cloudstorage.async_flush.enabled", + "enable async flushing", + true, +) - if file.rawSize == 0 { - // This method shouldn't be called with an empty file, but be defensive - // about not writing empty files anyway. - return nil +// waitAsyncFlush waits until all async flushes complete. +func (s *cloudStorageSink) waitAsyncFlush() error { + s.flushGroup.Wait() + if v := s.flushErr.Load(); v != nil { + return v.(error) } + return nil +} - if file.codec != nil { - if err := file.codec.Close(); err != nil { +// flushFile flushes file to the cloud storage. +// file should not be used after flushing. +func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { + asyncFlushEnabled := enableAsyncFlush.Get(&s.settings.SV) + if s.asyncFlushActive && !asyncFlushEnabled { + // Async flush behavior was turned off -- drain any active flush requests + // before flushing this file. + if err := s.waitAsyncFlush(); err != nil { return err } } + s.asyncFlushActive = asyncFlushEnabled // We use this monotonically increasing fileID to ensure correct ordering // among files emitted at the same timestamp during the same job session. fileID := s.fileID s.fileID++ + // Pad file ID to maintain lexical ordering among files from the same sink. // Note that we use `-` here to delimit the filename because we want // `%d.RESOLVED` files to lexicographically succeed data files that have the @@ -606,11 +638,52 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink "precedes a file emitted before: %s", filename, s.prevFilename) } s.prevFilename = filename - compressedBytes := file.buf.Len() - if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil { + dest := filepath.Join(s.dataFilePartition, filename) + + if !asyncFlushEnabled { + return file.flushToStorage(ctx, s.es, dest, s.metrics) + } + + s.flushGroup.Add(1) + go func() { + defer s.flushGroup.Done() + // NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled). + if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil { + log.Errorf(ctx, "error flushing file to storage: %s", err) + // We must use the same type for error we store in flushErr. + s.flushErr.CompareAndSwap(nil, &flushError{error: err}) + } + }() + return nil +} + +type flushError struct { + error +} + +// flushToStorage writes out file into external storage into 'dest'. +func (f *cloudStorageSinkFile) flushToStorage( + ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder, +) error { + defer f.alloc.Release(ctx) + + if f.rawSize == 0 { + // This method shouldn't be called with an empty file, but be defensive + // about not writing empty files anyway. + return nil + } + + if f.codec != nil { + if err := f.codec.Close(); err != nil { + return err + } + } + + compressedBytes := f.buf.Len() + if err := cloud.WriteFile(ctx, es, dest, bytes.NewReader(f.buf.Bytes())); err != nil { return err } - s.metrics.recordEmittedBatch(file.created, file.numMessages, file.oldestMVCC, file.rawSize, compressedBytes) + m.recordEmittedBatch(f.created, f.numMessages, f.oldestMVCC, f.rawSize, compressedBytes) return nil } @@ -618,7 +691,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink // Close implements the Sink interface. func (s *cloudStorageSink) Close() error { s.files = nil - return s.es.Close() + return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close()) } // Dial implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index a0e6e305f0af..a6e9fda37efd 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -69,7 +69,7 @@ func TestCloudStorageSink(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - dir, dirCleanupFn := testutils.TempDir(t) + externalIODir, dirCleanupFn := testutils.TempDir(t) defer dirCleanupFn() gzipDecompress := func(t *testing.T, compressed []byte) []byte { @@ -88,8 +88,12 @@ func TestCloudStorageSink(t *testing.T) { return decompressed } - listLeafDirectories := func(root string) []string { - absRoot := filepath.Join(dir, root) + testDir := func(t *testing.T) string { + return strings.ReplaceAll(t.Name(), "/", ";") + } + + listLeafDirectories := func(t *testing.T) []string { + absRoot := filepath.Join(externalIODir, testDir(t)) var folders []string @@ -126,7 +130,7 @@ func TestCloudStorageSink(t *testing.T) { // slurpDir returns the contents of every file under root (relative to the // temp dir created above), sorted by the name of the file. - slurpDir := func(t *testing.T, root string) []string { + slurpDir := func(t *testing.T) []string { var files []string walkFn := func(path string, info os.FileInfo, err error) error { if err != nil { @@ -145,7 +149,7 @@ func TestCloudStorageSink(t *testing.T) { files = append(files, string(file)) return nil } - absRoot := filepath.Join(dir, root) + absRoot := filepath.Join(externalIODir, testDir(t)) require.NoError(t, os.MkdirAll(absRoot, 0755)) require.NoError(t, filepath.Walk(absRoot, walkFn)) return files @@ -154,7 +158,7 @@ func TestCloudStorageSink(t *testing.T) { const unlimitedFileSize int64 = math.MaxInt64 var noKey []byte settings := cluster.MakeTestingClusterSettings() - settings.ExternalIODir = dir + settings.ExternalIODir = externalIODir opts := changefeedbase.EncodingOptions{ Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeWrapped, @@ -180,25 +184,33 @@ func TestCloudStorageSink(t *testing.T) { user := username.RootUserName() - sinkURI := func(dir string, maxFileSize int64) sinkURL { - uri := `nodelocal://0/` + dir + sinkURI := func(t *testing.T, maxFileSize int64) sinkURL { + u, err := url.Parse(fmt.Sprintf("nodelocal://0/%s", testDir(t))) + require.NoError(t, err) + sink := sinkURL{URL: u} if maxFileSize != unlimitedFileSize { - uri += fmt.Sprintf("?%s=%d", changefeedbase.SinkParamFileSize, maxFileSize) + sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) } - u, err := url.Parse(uri) - require.NoError(t, err) - return sinkURL{URL: u} + return sink } - t.Run(`golden`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { + t.Helper() + testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { + enableAsyncFlush.Override(context.Background(), &settings.SV, enable) + testFn(t) + }) + } + + testWithAndWithoutAsyncFlushing(t, `golden`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - sinkDir := `golden` + s, err := makeCloudStorageSink( - ctx, sinkURI(sinkDir, unlimitedFileSize), 1, settings, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -210,11 +222,11 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string{ "v1\n", - }, slurpDir(t, sinkDir)) + }, slurpDir(t)) require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(5))) resolvedFile, err := os.ReadFile(filepath.Join( - dir, sinkDir, `1970-01-01`, `197001010000000000000050000000000.RESOLVED`)) + externalIODir, testDir(t), `1970-01-01`, `197001010000000000000050000000000.RESOLVED`)) require.NoError(t, err) require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile)) }) @@ -225,7 +237,14 @@ func TestCloudStorageSink(t *testing.T) { return forwarded } - t.Run(`single-node`, func(t *testing.T) { + stringOrDefault := func(s, ifEmpty string) string { + if len(s) == 0 { + return ifEmpty + } + return s + } + + testWithAndWithoutAsyncFlushing(t, `single-node`, func(t *testing.T) { before := opts.Compression // Compression codecs include buffering that interferes with other tests, // e.g. the bucketing test that configures very small flush sizes. @@ -234,7 +253,7 @@ func TestCloudStorageSink(t *testing.T) { }() for _, compression := range []string{"", "gzip"} { opts.Compression = compression - t.Run("compress="+compression, func(t *testing.T) { + t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) { t1 := makeTopic(`t1`) t2 := makeTopic(`t2`) @@ -242,9 +261,8 @@ func TestCloudStorageSink(t *testing.T) { sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `single-node` + compression s, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, settings, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -253,7 +271,7 @@ func TestCloudStorageSink(t *testing.T) { // Empty flush emits no files. require.NoError(t, s.Flush(ctx)) - require.Equal(t, []string(nil), slurpDir(t, dir)) + require.Equal(t, []string(nil), slurpDir(t)) // Emitting rows and flushing should write them out in one file per table. Note // the ordering among these two files is non deterministic as either of them could @@ -268,19 +286,19 @@ func TestCloudStorageSink(t *testing.T) { "v1\nv2\n", "w1\n", } - actual := slurpDir(t, dir) + actual := slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) // Flushing with no new emits writes nothing new. require.NoError(t, s.Flush(ctx)) - actual = slurpDir(t, dir) + actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) // Without a flush, nothing new shows up. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc)) - actual = slurpDir(t, dir) + actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) @@ -290,7 +308,7 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v3\n", - }, slurpDir(t, dir)[2:]) + }, slurpDir(t)[2:]) // Data from different versions of a table is put in different files, so that we // can guarantee that all rows in any given file have the same schema. @@ -306,7 +324,7 @@ func TestCloudStorageSink(t *testing.T) { "v4\n", "v5\n", } - actual = slurpDir(t, dir) + actual = slurpDir(t) actual = actual[len(actual)-2:] sort.Strings(actual) require.Equal(t, expected, actual) @@ -314,22 +332,20 @@ func TestCloudStorageSink(t *testing.T) { } }) - t.Run(`multi-node`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `multi-node`, func(t *testing.T) { t1 := makeTopic(`t1`) - testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `multi-node` s1, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1.Close()) }() s2, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 2, + ctx, sinkURI(t, unlimitedFileSize), 2, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) defer func() { require.NoError(t, s2.Close()) }() @@ -353,19 +369,19 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string{ "v1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // If a node restarts then the entire distsql flow has to restart. If // this happens before checkpointing, some data is written again but // this is unavoidable. s1R, err := makeCloudStorageSink( - ctx, sinkURI(dir, unbuffered), 1, + ctx, sinkURI(t, unbuffered), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1R.Close()) }() s2R, err := makeCloudStorageSink( - ctx, sinkURI(dir, unbuffered), 2, + ctx, sinkURI(t, unbuffered), 2, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -390,7 +406,7 @@ func TestCloudStorageSink(t *testing.T) { "v1\n", "w1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) // The jobs system can't always clean up perfectly after itself and so there @@ -400,15 +416,14 @@ func TestCloudStorageSink(t *testing.T) { // // This test is also sufficient for verifying the behavior of a multi-node // changefeed using this sink. Ditto job restarts. - t.Run(`zombie`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `zombie`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `zombie` s1, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -416,7 +431,7 @@ func TestCloudStorageSink(t *testing.T) { s1.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. s1.(*cloudStorageSink).jobSessionID = "a" // Force deterministic job session ID. s2, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -441,19 +456,21 @@ func TestCloudStorageSink(t *testing.T) { "v1\nv2\n", "v3\n", "v1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`bucketing`, func(t *testing.T) { + waitAsyncFlush := func(s Sink) error { + return s.(*cloudStorageSink).waitAsyncFlush() + } + testWithAndWithoutAsyncFlushing(t, `bucketing`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `bucketing` const targetMaxFileSize = 6 s, err := makeCloudStorageSink( - ctx, sinkURI(dir, targetMaxFileSize), 1, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -465,16 +482,17 @@ func TestCloudStorageSink(t *testing.T) { for i := int64(1); i <= 5; i++ { require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) } + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\nv2\nv3\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Flush then writes the rest. require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v1\nv2\nv3\n", "v4\nv5\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Forward the SpanFrontier here and trigger an empty flush to update // the sink's `inclusiveLowerBoundTs` @@ -487,11 +505,12 @@ func TestCloudStorageSink(t *testing.T) { for i := int64(6); i < 10; i++ { require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) } + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\nv2\nv3\n", "v4\nv5\n", "v6\nv7\nv8\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Resolved timestamps are periodically written. This happens // asynchronously from a different node and can be given an earlier @@ -507,7 +526,7 @@ func TestCloudStorageSink(t *testing.T) { "v4\nv5\n", `{"resolved":"5.0000000000"}`, "v6\nv7\nv8\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Flush then writes the rest. Since we use the time of the EmitRow // or EmitResolvedTimestamp calls to order files, the resolved timestamp @@ -520,7 +539,7 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"5.0000000000"}`, "v6\nv7\nv8\n", "v9\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // A resolved timestamp emitted with ts > 5 should follow everything // emitted thus far. @@ -532,10 +551,10 @@ func TestCloudStorageSink(t *testing.T) { "v6\nv7\nv8\n", "v9\n", `{"resolved":"6.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`partition-formatting`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `partition-formatting`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} const targetMaxFileSize = 6 @@ -550,7 +569,7 @@ func TestCloudStorageSink(t *testing.T) { time.Date(2000, time.January, 2, 6, 1, 1, 0, time.UTC), } - for i, tc := range []struct { + for _, tc := range []struct { format string expectedFolders []string }{ @@ -582,51 +601,50 @@ func TestCloudStorageSink(t *testing.T) { }, }, } { - t.Run(tc.format, func(t *testing.T) { - sf, err := span.MakeFrontier(testSpan) - require.NoError(t, err) - timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + testWithAndWithoutAsyncFlushing(t, stringOrDefault(tc.format, "default"), + func(t *testing.T) { + sf, err := span.MakeFrontier(testSpan) + require.NoError(t, err) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := fmt.Sprintf(`partition-formatting-%d`, i) + sinkURIWithParam := sinkURI(t, targetMaxFileSize) + sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) + t.Logf("format=%s sinkgWithParam: %s", tc.format, sinkURIWithParam.String()) + s, err := makeCloudStorageSink( + ctx, sinkURIWithParam, 1, + settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ) - sinkURIWithParam := sinkURI(dir, targetMaxFileSize) - sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) - s, err := makeCloudStorageSink( - ctx, sinkURIWithParam, 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, - ) - - require.NoError(t, err) - defer func() { require.NoError(t, s.Close()) }() - s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + require.NoError(t, err) + defer func() { require.NoError(t, s.Close()) }() + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. - for i, timestamp := range timestamps { - hlcTime := ts(timestamp.UnixNano()) + for i, timestamp := range timestamps { + hlcTime := ts(timestamp.UnixNano()) - // Move the frontier and flush to update the dataFilePartition value - _, err = sf.Forward(testSpan, hlcTime) - require.NoError(t, err) - require.NoError(t, s.Flush(ctx)) + // Move the frontier and flush to update the dataFilePartition value + _, err = sf.Forward(testSpan, hlcTime) + require.NoError(t, err) + require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) - } + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) + } - require.NoError(t, s.Flush(ctx)) // Flush the last file - require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(dir)) - require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t, dir)) - }) + require.NoError(t, s.Flush(ctx)) // Flush the last file + require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(t)) + require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t)) + }) } }) - t.Run(`file-ordering`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `file-ordering`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `file-ordering` s, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -663,7 +681,7 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"3.0000000000"}`, "e3next\n", `{"resolved":"4.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) // Test that files with timestamp lower than the least resolved timestamp // as of file creation time are ignored. @@ -675,19 +693,18 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"3.0000000000"}`, "e3next\n", `{"resolved":"4.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`ordering-among-schema-versions`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `ordering-among-schema-versions`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `ordering-among-schema-versions` var targetMaxFileSize int64 = 10 s, err := makeCloudStorageSink( - ctx, sinkURI(dir, targetMaxFileSize), 1, settings, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -699,20 +716,22 @@ func TestCloudStorageSink(t *testing.T) { // for the first file but not the second one. t1.Version = 0 require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Now make the file with the newer schema exceed its file size threshold and ensure // that the file with the older schema is flushed (and ordered) before. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), zeroAlloc)) t1.Version = 1 require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v3`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", "v2\n", "v3\ntrigger-flush-v3\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Calling `Flush()` on the sink should emit files in the order of their schema IDs. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc)) @@ -725,6 +744,6 @@ func TestCloudStorageSink(t *testing.T) { "v3\ntrigger-flush-v3\n", "x1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) } From d41cce07e6649d0f34cbbb03dc25eb9156055b2c Mon Sep 17 00:00:00 2001 From: Nick Travers Date: Thu, 22 Sep 2022 13:09:06 -0700 Subject: [PATCH 2/3] kvserver: add storage time-series metrics for level size and score Currently, the only way to infer the compaction score and heuristics is to use the LSM printout from the logs (emitted once every ten minutes), or to call the `/debug/lsm` endpoint manually, and track values over time. This makes it difficult to debug issues retroactively. Add two new sets of per-LSM-level time-series metrics for level size and level score. These new metrics have names of the form `storage.$LEVEL-level-{size,score}`. Closes #88415. Release note (ops change): Adds two new sets of per-LSM-level time-series metrics, one for level size and another for level score. These metrics are of the form `storage.$LEVEL-level-{size,score}`. --- pkg/kv/kvserver/metrics.go | 32 +++++++++++++++++++++++++++++++- pkg/ts/catalog/chart_catalog.go | 25 +++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 15b6ee6908f0..f8bef021b3f1 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -550,6 +550,20 @@ var metaRdbBytesIngested = storageLevelMetricMetadata( metric.Unit_BYTES, ) +var metaRdbLevelSize = storageLevelMetricMetadata( + "level-size", + "Size of the SSTables in level %d", + "Bytes", + metric.Unit_BYTES, +) + +var metaRdbLevelScores = storageLevelMetricMetadata( + "level-score", + "Compaction score of level %d", + "Score", + metric.Unit_COUNT, +) + var ( metaRdbWriteStalls = metric.Metadata{ Name: "storage.write-stalls", @@ -1708,7 +1722,9 @@ type StoreMetrics struct { RdbL0BytesFlushed *metric.Gauge RdbL0Sublevels *metric.Gauge RdbL0NumFiles *metric.Gauge - RdbBytesIngested [7]*metric.Gauge // idx = level + RdbBytesIngested [7]*metric.Gauge // idx = level + RdbLevelSize [7]*metric.Gauge // idx = level + RdbLevelScore [7]*metric.GaugeFloat64 // idx = level RdbWriteStalls *metric.Gauge RdbWriteStallNanos *metric.Gauge @@ -2137,6 +2153,8 @@ func newTenantsStorageMetrics() *TenantsStorageMetrics { func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { storeRegistry := metric.NewRegistry() rdbBytesIngested := storageLevelGaugeSlice(metaRdbBytesIngested) + rdbLevelSize := storageLevelGaugeSlice(metaRdbLevelSize) + rdbLevelScore := storageLevelGaugeFloat64Slice(metaRdbLevelScores) sm := &StoreMetrics{ registry: storeRegistry, @@ -2219,6 +2237,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RdbL0Sublevels: metric.NewGauge(metaRdbL0Sublevels), RdbL0NumFiles: metric.NewGauge(metaRdbL0NumFiles), RdbBytesIngested: rdbBytesIngested, + RdbLevelSize: rdbLevelSize, + RdbLevelScore: rdbLevelScore, RdbWriteStalls: metric.NewGauge(metaRdbWriteStalls), RdbWriteStallNanos: metric.NewGauge(metaRdbWriteStallNanos), @@ -2523,6 +2543,8 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.RdbL0BytesFlushed.Update(int64(m.Levels[0].BytesFlushed)) for level, stats := range m.Levels { sm.RdbBytesIngested[level].Update(int64(stats.BytesIngested)) + sm.RdbLevelSize[level].Update(stats.Size) + sm.RdbLevelScore[level].Update(stats.Score) } } @@ -2577,3 +2599,11 @@ func storageLevelGaugeSlice(sl [7]metric.Metadata) [7]*metric.Gauge { } return gs } + +func storageLevelGaugeFloat64Slice(sl [7]metric.Metadata) [7]*metric.GaugeFloat64 { + var gs [7]*metric.GaugeFloat64 + for i := range sl { + gs[i] = metric.NewGaugeFloat64(sl[i]) + } + return gs +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 13dbb3b1ebd5..e4ddbe13a291 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3004,6 +3004,31 @@ var charts = []sectionDescription{ Metrics: []string{"storage.write-stall-nanos"}, AxisLabel: "Duration (nanos)", }, + { + Title: "Bytes Used Per Level", + Metrics: []string{ + "storage.l0-level-size", + "storage.l1-level-size", + "storage.l2-level-size", + "storage.l3-level-size", + "storage.l4-level-size", + "storage.l5-level-size", + "storage.l6-level-size", + }, + AxisLabel: "Bytes", + }, + { + Title: "Compaction Score Per Level", + Metrics: []string{ + "storage.l0-level-score", + "storage.l1-level-score", + "storage.l2-level-score", + "storage.l3-level-score", + "storage.l4-level-score", + "storage.l5-level-score", + "storage.l6-level-score", + }, + }, }, }, { From e32e9146f088858a836112dba492167aee4a588e Mon Sep 17 00:00:00 2001 From: j82w Date: Thu, 22 Sep 2022 16:13:33 -0400 Subject: [PATCH 3/3] ui: insights overview rename 'execution id' to 'latest execution id' closes #88456 Release justification: Category 2: Bug fixes and low-risk updates to new functionality Release note: (ui change): Rename insights overview table column 'execution id' to 'latest execution id'. This will help avoid confusion since the ui only shows the latest id per fingerprint. --- .../statementInsightsTable.tsx | 18 +++++++++--------- .../transactionInsightsTable.tsx | 4 ++-- .../workloadInsights/util/insightsColumns.tsx | 16 +++++++++++++++- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx index e18cd94d75df..8a4cc81da334 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/statementInsights/statementInsightsTable.tsx @@ -39,8 +39,8 @@ export function makeStatementInsightsColumns(): ColumnDescriptor ( {String(item.statementID)} @@ -141,13 +141,6 @@ export function makeStatementInsightsColumns(): ColumnDescriptor String(item.isFullScan), showByDefault: false, }, - { - name: "transactionID", - title: insightsTableTitles.executionID(InsightExecEnum.TRANSACTION), - cell: (item: StatementInsightEvent) => item.transactionID, - sort: (item: StatementInsightEvent) => item.transactionID, - showByDefault: false, - }, { name: "transactionFingerprintID", title: insightsTableTitles.fingerprintID(InsightExecEnum.TRANSACTION), @@ -155,6 +148,13 @@ export function makeStatementInsightsColumns(): ColumnDescriptor item.transactionFingerprintID, showByDefault: false, }, + { + name: "transactionID", + title: insightsTableTitles.latestExecutionID(InsightExecEnum.TRANSACTION), + cell: (item: StatementInsightEvent) => item.transactionID, + sort: (item: StatementInsightEvent) => item.transactionID, + showByDefault: false, + }, ]; } diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx index 7cb960cc2fa0..5a5776405746 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/transactionInsights/transactionInsightsTable.tsx @@ -32,8 +32,8 @@ export function makeTransactionInsightsColumns(): ColumnDescriptor ( {String(item.transactionID)} diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx index f87cb658eed6..c8326749060b 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsights/util/insightsColumns.tsx @@ -17,6 +17,7 @@ import { contentionTime, readFromDisk, writtenToDisk } from "../../../util"; export const insightsColumnLabels = { executionID: "Execution ID", + latestExecutionID: "Latest Execution ID", query: "Execution", insights: "Insights", startTime: "Start Time (UTC)", @@ -42,8 +43,14 @@ export function getLabel( ): string { switch (execType) { case InsightExecEnum.TRANSACTION: + if (key === "latestExecutionID") { + return "Latest Transaction Execution ID"; + } return "Transaction " + insightsColumnLabels[key]; case InsightExecEnum.STATEMENT: + if (key === "latestExecutionID") { + return "Latest Statement Execution ID"; + } return "Statement " + insightsColumnLabels[key]; default: return insightsColumnLabels[key]; @@ -71,12 +78,19 @@ export const insightsTableTitles: InsightsTableTitleType = { ); }, executionID: (execType: InsightExecEnum) => { + return makeToolTip( +

The ID of the execution with the {execType} fingerprint.

, + "executionID", + execType, + ); + }, + latestExecutionID: (execType: InsightExecEnum) => { return makeToolTip(

The execution ID of the latest execution with the {execType}{" "} fingerprint.

, - "executionID", + "latestExecutionID", execType, ); },