From 81f9932c4cb04e9475b77415dfaee696d6283af2 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 21 Sep 2022 16:21:35 -0400 Subject: [PATCH] changefeedccl: Do not block on file size based flushes Prior to this change, cloud storage sink trigger file sized based flush whenever new row would would push the file size beyond configured threshold. This had the effect of singificantly reducing the throughput whenever such event occured -- no additional events could be added to cloud storage sink, while the previus flush was active. This is not necessary. Cloud storage sink can trigger file based flushes asynchronously. The only requirement is that if a real, non file based, flush arrives, or if we need to emit resolved timestamps, then we must wait for all of the active flush requests to complete. In addition, because every event added to cloud sink has associate allocation, which is released when file is written out, performing flushes asynchronously is safe with respect to memory usage and accounting. Release note (enterprise change): Changefeeds, using cloud storage sink, now have better throughput. Release justification: performance fix --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 105 +++++++-- .../changefeedccl/sink_cloudstorage_test.go | 199 ++++++++++-------- 2 files changed, 198 insertions(+), 106 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 61e945ee4734..fe1d3f0b3769 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -19,6 +19,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -304,6 +306,11 @@ type cloudStorageSink struct { dataFilePartition string prevFilename string metrics metricsRecorder + + asyncFlushActive bool + flushCtx context.Context + flushGroup sync.WaitGroup + flushErr atomic.Value } const sinkCompressionGzip = "gzip" @@ -364,8 +371,11 @@ func makeCloudStorageSink( partitionFormat: defaultPartitionFormat, timestampOracle: timestampOracle, // TODO(dan,ajwerner): Use the jobs framework's session ID once that's available. - jobSessionID: sessID, - topicNamer: tn, + jobSessionID: sessID, + topicNamer: tn, + asyncFlushActive: enableAsyncFlush.Get(&settings.SV), + // TODO (yevgeniy): Consider adding ctx to Dial method instead. + flushCtx: ctx, } if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" { @@ -503,6 +513,13 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( if err != nil { return err } + + // Wait for previously issued async flush requests to complete + // before we write resolved time stamp file. + if err := s.waitAsyncFlush(); err != nil { + return errors.Wrapf(err, "while emitting resolved timestamp") + } + // Don't need to copy payload because we never buffer it anywhere. part := resolved.GoTime().Format(s.partitionFormat) @@ -572,29 +589,44 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { // for an overview of the naming convention and proof of correctness. s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) - return nil + return s.waitAsyncFlush() } -// file should not be used after flushing. -func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { - defer file.alloc.Release(ctx) +// enableAsyncFlush controls async flushing behavior for this sink. +var enableAsyncFlush = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.cloudstorage.async_flush.enabled", + "enable async flushing", + true, +) - if file.rawSize == 0 { - // This method shouldn't be called with an empty file, but be defensive - // about not writing empty files anyway. - return nil +// waitAsyncFlush waits until all async flushes complete. +func (s *cloudStorageSink) waitAsyncFlush() error { + s.flushGroup.Wait() + if v := s.flushErr.Load(); v != nil { + return v.(error) } + return nil +} - if file.codec != nil { - if err := file.codec.Close(); err != nil { +// flushFile flushes file to the cloud storage. +// file should not be used after flushing. +func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { + asyncFlushEnabled := enableAsyncFlush.Get(&s.settings.SV) + if s.asyncFlushActive && !asyncFlushEnabled { + // Async flush behavior was turned off -- drain any active flush requests + // before flushing this file. + if err := s.waitAsyncFlush(); err != nil { return err } } + s.asyncFlushActive = asyncFlushEnabled // We use this monotonically increasing fileID to ensure correct ordering // among files emitted at the same timestamp during the same job session. fileID := s.fileID s.fileID++ + // Pad file ID to maintain lexical ordering among files from the same sink. // Note that we use `-` here to delimit the filename because we want // `%d.RESOLVED` files to lexicographically succeed data files that have the @@ -606,11 +638,52 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink "precedes a file emitted before: %s", filename, s.prevFilename) } s.prevFilename = filename - compressedBytes := file.buf.Len() - if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil { + dest := filepath.Join(s.dataFilePartition, filename) + + if !asyncFlushEnabled { + return file.flushToStorage(ctx, s.es, dest, s.metrics) + } + + s.flushGroup.Add(1) + go func() { + defer s.flushGroup.Done() + // NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled). + if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil { + log.Errorf(ctx, "error flushing file to storage: %s", err) + // We must use the same type for error we store in flushErr. + s.flushErr.CompareAndSwap(nil, &flushError{error: err}) + } + }() + return nil +} + +type flushError struct { + error +} + +// flushToStorage writes out file into external storage into 'dest'. +func (f *cloudStorageSinkFile) flushToStorage( + ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder, +) error { + defer f.alloc.Release(ctx) + + if f.rawSize == 0 { + // This method shouldn't be called with an empty file, but be defensive + // about not writing empty files anyway. + return nil + } + + if f.codec != nil { + if err := f.codec.Close(); err != nil { + return err + } + } + + compressedBytes := f.buf.Len() + if err := cloud.WriteFile(ctx, es, dest, bytes.NewReader(f.buf.Bytes())); err != nil { return err } - s.metrics.recordEmittedBatch(file.created, file.numMessages, file.oldestMVCC, file.rawSize, compressedBytes) + m.recordEmittedBatch(f.created, f.numMessages, f.oldestMVCC, f.rawSize, compressedBytes) return nil } @@ -618,7 +691,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink // Close implements the Sink interface. func (s *cloudStorageSink) Close() error { s.files = nil - return s.es.Close() + return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close()) } // Dial implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index a0e6e305f0af..a6e9fda37efd 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -69,7 +69,7 @@ func TestCloudStorageSink(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - dir, dirCleanupFn := testutils.TempDir(t) + externalIODir, dirCleanupFn := testutils.TempDir(t) defer dirCleanupFn() gzipDecompress := func(t *testing.T, compressed []byte) []byte { @@ -88,8 +88,12 @@ func TestCloudStorageSink(t *testing.T) { return decompressed } - listLeafDirectories := func(root string) []string { - absRoot := filepath.Join(dir, root) + testDir := func(t *testing.T) string { + return strings.ReplaceAll(t.Name(), "/", ";") + } + + listLeafDirectories := func(t *testing.T) []string { + absRoot := filepath.Join(externalIODir, testDir(t)) var folders []string @@ -126,7 +130,7 @@ func TestCloudStorageSink(t *testing.T) { // slurpDir returns the contents of every file under root (relative to the // temp dir created above), sorted by the name of the file. - slurpDir := func(t *testing.T, root string) []string { + slurpDir := func(t *testing.T) []string { var files []string walkFn := func(path string, info os.FileInfo, err error) error { if err != nil { @@ -145,7 +149,7 @@ func TestCloudStorageSink(t *testing.T) { files = append(files, string(file)) return nil } - absRoot := filepath.Join(dir, root) + absRoot := filepath.Join(externalIODir, testDir(t)) require.NoError(t, os.MkdirAll(absRoot, 0755)) require.NoError(t, filepath.Walk(absRoot, walkFn)) return files @@ -154,7 +158,7 @@ func TestCloudStorageSink(t *testing.T) { const unlimitedFileSize int64 = math.MaxInt64 var noKey []byte settings := cluster.MakeTestingClusterSettings() - settings.ExternalIODir = dir + settings.ExternalIODir = externalIODir opts := changefeedbase.EncodingOptions{ Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeWrapped, @@ -180,25 +184,33 @@ func TestCloudStorageSink(t *testing.T) { user := username.RootUserName() - sinkURI := func(dir string, maxFileSize int64) sinkURL { - uri := `nodelocal://0/` + dir + sinkURI := func(t *testing.T, maxFileSize int64) sinkURL { + u, err := url.Parse(fmt.Sprintf("nodelocal://0/%s", testDir(t))) + require.NoError(t, err) + sink := sinkURL{URL: u} if maxFileSize != unlimitedFileSize { - uri += fmt.Sprintf("?%s=%d", changefeedbase.SinkParamFileSize, maxFileSize) + sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) } - u, err := url.Parse(uri) - require.NoError(t, err) - return sinkURL{URL: u} + return sink } - t.Run(`golden`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { + t.Helper() + testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { + enableAsyncFlush.Override(context.Background(), &settings.SV, enable) + testFn(t) + }) + } + + testWithAndWithoutAsyncFlushing(t, `golden`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - sinkDir := `golden` + s, err := makeCloudStorageSink( - ctx, sinkURI(sinkDir, unlimitedFileSize), 1, settings, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -210,11 +222,11 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string{ "v1\n", - }, slurpDir(t, sinkDir)) + }, slurpDir(t)) require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(5))) resolvedFile, err := os.ReadFile(filepath.Join( - dir, sinkDir, `1970-01-01`, `197001010000000000000050000000000.RESOLVED`)) + externalIODir, testDir(t), `1970-01-01`, `197001010000000000000050000000000.RESOLVED`)) require.NoError(t, err) require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile)) }) @@ -225,7 +237,14 @@ func TestCloudStorageSink(t *testing.T) { return forwarded } - t.Run(`single-node`, func(t *testing.T) { + stringOrDefault := func(s, ifEmpty string) string { + if len(s) == 0 { + return ifEmpty + } + return s + } + + testWithAndWithoutAsyncFlushing(t, `single-node`, func(t *testing.T) { before := opts.Compression // Compression codecs include buffering that interferes with other tests, // e.g. the bucketing test that configures very small flush sizes. @@ -234,7 +253,7 @@ func TestCloudStorageSink(t *testing.T) { }() for _, compression := range []string{"", "gzip"} { opts.Compression = compression - t.Run("compress="+compression, func(t *testing.T) { + t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) { t1 := makeTopic(`t1`) t2 := makeTopic(`t2`) @@ -242,9 +261,8 @@ func TestCloudStorageSink(t *testing.T) { sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `single-node` + compression s, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, settings, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -253,7 +271,7 @@ func TestCloudStorageSink(t *testing.T) { // Empty flush emits no files. require.NoError(t, s.Flush(ctx)) - require.Equal(t, []string(nil), slurpDir(t, dir)) + require.Equal(t, []string(nil), slurpDir(t)) // Emitting rows and flushing should write them out in one file per table. Note // the ordering among these two files is non deterministic as either of them could @@ -268,19 +286,19 @@ func TestCloudStorageSink(t *testing.T) { "v1\nv2\n", "w1\n", } - actual := slurpDir(t, dir) + actual := slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) // Flushing with no new emits writes nothing new. require.NoError(t, s.Flush(ctx)) - actual = slurpDir(t, dir) + actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) // Without a flush, nothing new shows up. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc)) - actual = slurpDir(t, dir) + actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) @@ -290,7 +308,7 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v3\n", - }, slurpDir(t, dir)[2:]) + }, slurpDir(t)[2:]) // Data from different versions of a table is put in different files, so that we // can guarantee that all rows in any given file have the same schema. @@ -306,7 +324,7 @@ func TestCloudStorageSink(t *testing.T) { "v4\n", "v5\n", } - actual = slurpDir(t, dir) + actual = slurpDir(t) actual = actual[len(actual)-2:] sort.Strings(actual) require.Equal(t, expected, actual) @@ -314,22 +332,20 @@ func TestCloudStorageSink(t *testing.T) { } }) - t.Run(`multi-node`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `multi-node`, func(t *testing.T) { t1 := makeTopic(`t1`) - testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `multi-node` s1, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1.Close()) }() s2, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 2, + ctx, sinkURI(t, unlimitedFileSize), 2, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) defer func() { require.NoError(t, s2.Close()) }() @@ -353,19 +369,19 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string{ "v1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // If a node restarts then the entire distsql flow has to restart. If // this happens before checkpointing, some data is written again but // this is unavoidable. s1R, err := makeCloudStorageSink( - ctx, sinkURI(dir, unbuffered), 1, + ctx, sinkURI(t, unbuffered), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1R.Close()) }() s2R, err := makeCloudStorageSink( - ctx, sinkURI(dir, unbuffered), 2, + ctx, sinkURI(t, unbuffered), 2, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -390,7 +406,7 @@ func TestCloudStorageSink(t *testing.T) { "v1\n", "w1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) // The jobs system can't always clean up perfectly after itself and so there @@ -400,15 +416,14 @@ func TestCloudStorageSink(t *testing.T) { // // This test is also sufficient for verifying the behavior of a multi-node // changefeed using this sink. Ditto job restarts. - t.Run(`zombie`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `zombie`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `zombie` s1, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -416,7 +431,7 @@ func TestCloudStorageSink(t *testing.T) { s1.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. s1.(*cloudStorageSink).jobSessionID = "a" // Force deterministic job session ID. s2, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -441,19 +456,21 @@ func TestCloudStorageSink(t *testing.T) { "v1\nv2\n", "v3\n", "v1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`bucketing`, func(t *testing.T) { + waitAsyncFlush := func(s Sink) error { + return s.(*cloudStorageSink).waitAsyncFlush() + } + testWithAndWithoutAsyncFlushing(t, `bucketing`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `bucketing` const targetMaxFileSize = 6 s, err := makeCloudStorageSink( - ctx, sinkURI(dir, targetMaxFileSize), 1, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -465,16 +482,17 @@ func TestCloudStorageSink(t *testing.T) { for i := int64(1); i <= 5; i++ { require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) } + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\nv2\nv3\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Flush then writes the rest. require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v1\nv2\nv3\n", "v4\nv5\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Forward the SpanFrontier here and trigger an empty flush to update // the sink's `inclusiveLowerBoundTs` @@ -487,11 +505,12 @@ func TestCloudStorageSink(t *testing.T) { for i := int64(6); i < 10; i++ { require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) } + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\nv2\nv3\n", "v4\nv5\n", "v6\nv7\nv8\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Resolved timestamps are periodically written. This happens // asynchronously from a different node and can be given an earlier @@ -507,7 +526,7 @@ func TestCloudStorageSink(t *testing.T) { "v4\nv5\n", `{"resolved":"5.0000000000"}`, "v6\nv7\nv8\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Flush then writes the rest. Since we use the time of the EmitRow // or EmitResolvedTimestamp calls to order files, the resolved timestamp @@ -520,7 +539,7 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"5.0000000000"}`, "v6\nv7\nv8\n", "v9\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // A resolved timestamp emitted with ts > 5 should follow everything // emitted thus far. @@ -532,10 +551,10 @@ func TestCloudStorageSink(t *testing.T) { "v6\nv7\nv8\n", "v9\n", `{"resolved":"6.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`partition-formatting`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `partition-formatting`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} const targetMaxFileSize = 6 @@ -550,7 +569,7 @@ func TestCloudStorageSink(t *testing.T) { time.Date(2000, time.January, 2, 6, 1, 1, 0, time.UTC), } - for i, tc := range []struct { + for _, tc := range []struct { format string expectedFolders []string }{ @@ -582,51 +601,50 @@ func TestCloudStorageSink(t *testing.T) { }, }, } { - t.Run(tc.format, func(t *testing.T) { - sf, err := span.MakeFrontier(testSpan) - require.NoError(t, err) - timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + testWithAndWithoutAsyncFlushing(t, stringOrDefault(tc.format, "default"), + func(t *testing.T) { + sf, err := span.MakeFrontier(testSpan) + require.NoError(t, err) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := fmt.Sprintf(`partition-formatting-%d`, i) + sinkURIWithParam := sinkURI(t, targetMaxFileSize) + sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) + t.Logf("format=%s sinkgWithParam: %s", tc.format, sinkURIWithParam.String()) + s, err := makeCloudStorageSink( + ctx, sinkURIWithParam, 1, + settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ) - sinkURIWithParam := sinkURI(dir, targetMaxFileSize) - sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) - s, err := makeCloudStorageSink( - ctx, sinkURIWithParam, 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, - ) - - require.NoError(t, err) - defer func() { require.NoError(t, s.Close()) }() - s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + require.NoError(t, err) + defer func() { require.NoError(t, s.Close()) }() + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. - for i, timestamp := range timestamps { - hlcTime := ts(timestamp.UnixNano()) + for i, timestamp := range timestamps { + hlcTime := ts(timestamp.UnixNano()) - // Move the frontier and flush to update the dataFilePartition value - _, err = sf.Forward(testSpan, hlcTime) - require.NoError(t, err) - require.NoError(t, s.Flush(ctx)) + // Move the frontier and flush to update the dataFilePartition value + _, err = sf.Forward(testSpan, hlcTime) + require.NoError(t, err) + require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) - } + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) + } - require.NoError(t, s.Flush(ctx)) // Flush the last file - require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(dir)) - require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t, dir)) - }) + require.NoError(t, s.Flush(ctx)) // Flush the last file + require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(t)) + require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t)) + }) } }) - t.Run(`file-ordering`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `file-ordering`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `file-ordering` s, err := makeCloudStorageSink( - ctx, sinkURI(dir, unlimitedFileSize), 1, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil, ) require.NoError(t, err) @@ -663,7 +681,7 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"3.0000000000"}`, "e3next\n", `{"resolved":"4.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) // Test that files with timestamp lower than the least resolved timestamp // as of file creation time are ignored. @@ -675,19 +693,18 @@ func TestCloudStorageSink(t *testing.T) { `{"resolved":"3.0000000000"}`, "e3next\n", `{"resolved":"4.0000000000"}`, - }, slurpDir(t, dir)) + }, slurpDir(t)) }) - t.Run(`ordering-among-schema-versions`, func(t *testing.T) { + testWithAndWithoutAsyncFlushing(t, `ordering-among-schema-versions`, func(t *testing.T) { t1 := makeTopic(`t1`) testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf, err := span.MakeFrontier(testSpan) require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `ordering-among-schema-versions` var targetMaxFileSize int64 = 10 s, err := makeCloudStorageSink( - ctx, sinkURI(dir, targetMaxFileSize), 1, settings, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, timestampOracle, externalStorageFromURI, user, nil) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -699,20 +716,22 @@ func TestCloudStorageSink(t *testing.T) { // for the first file but not the second one. t1.Version = 0 require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Now make the file with the newer schema exceed its file size threshold and ensure // that the file with the older schema is flushed (and ordered) before. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), zeroAlloc)) t1.Version = 1 require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v3`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", "v2\n", "v3\ntrigger-flush-v3\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) // Calling `Flush()` on the sink should emit files in the order of their schema IDs. require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc)) @@ -725,6 +744,6 @@ func TestCloudStorageSink(t *testing.T) { "v3\ntrigger-flush-v3\n", "x1\n", "w1\n", - }, slurpDir(t, dir)) + }, slurpDir(t)) }) }