From ae7dc6a01ec60d58e2216562387bd4acd415ddde Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Thu, 12 Aug 2021 10:56:27 -0700 Subject: [PATCH 1/5] randgen: refactor random expression generation This commit refactors the code that generates random computed columns so that the logic for generating random expressions can be used in a future commit to generate random expression indexes. Release note: None --- pkg/sql/randgen/expr.go | 123 ++++++++++++++++++++++++++++++++++++++ pkg/sql/randgen/schema.go | 113 ++-------------------------------- 2 files changed, 127 insertions(+), 109 deletions(-) diff --git a/pkg/sql/randgen/expr.go b/pkg/sql/randgen/expr.go index 6bcfee8d988e..c3a738fe9491 100644 --- a/pkg/sql/randgen/expr.go +++ b/pkg/sql/randgen/expr.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/randutil" ) // randPartialIndexPredicateFromCols creates a partial index expression with a @@ -120,3 +121,125 @@ func randAndOrExpr(rng *rand.Rand, left, right tree.Expr) tree.Expr { Right: right, } } + +// randExpr produces a random expression that refers to columns in +// normalColDefs. It can be used to generate random computed columns and +// expression indexes. The return type is the type of the expression. The +// returned nullability is NotNull if all columns referenced in the expression +// have a NotNull nullability. +func randExpr( + rng *rand.Rand, normalColDefs []*tree.ColumnTableDef, +) (tree.Expr, *types.T, tree.Nullability) { + nullability := tree.NotNull + + if rng.Intn(2) == 0 { + // Try to find a set of numeric columns with the same type; the computed + // expression will be of the form "a+b+c". + var cols []*tree.ColumnTableDef + var fam types.Family + for _, idx := range rng.Perm(len(normalColDefs)) { + x := normalColDefs[idx] + xFam := x.Type.(*types.T).Family() + + if len(cols) == 0 { + switch xFam { + case types.IntFamily, types.FloatFamily, types.DecimalFamily: + fam = xFam + cols = append(cols, x) + } + } else if fam == xFam { + cols = append(cols, x) + if len(cols) > 1 && rng.Intn(2) == 0 { + break + } + } + } + if len(cols) > 1 { + // If any of the columns are nullable, set the computed column to be + // nullable. + for _, x := range cols { + if x.Nullable.Nullability != tree.NotNull { + nullability = x.Nullable.Nullability + break + } + } + + var expr tree.Expr + expr = tree.NewUnresolvedName(string(cols[0].Name)) + for _, x := range cols[1:] { + expr = &tree.BinaryExpr{ + Operator: tree.MakeBinaryOperator(tree.Plus), + Left: expr, + Right: tree.NewUnresolvedName(string(x.Name)), + } + } + return expr, cols[0].Type.(*types.T), nullability + } + } + + // Pick a single column and create a computed column that depends on it. + // The expression is as follows: + // - for numeric types (int, float, decimal), the expression is "x+1"; + // - for string type, the expression is "lower(x)"; + // - for types that can be cast to string in computed columns, the expression + // is "lower(x::string)"; + // - otherwise, the expression is `CASE WHEN x IS NULL THEN 'foo' ELSE 'bar'`. + x := normalColDefs[randutil.RandIntInRange(rng, 0, len(normalColDefs))] + xTyp := x.Type.(*types.T) + + // Match the nullability with the nullability of the reference column. + nullability = x.Nullable.Nullability + nullOk := nullability != tree.NotNull + + var expr tree.Expr + var typ *types.T + switch xTyp.Family() { + case types.IntFamily, types.FloatFamily, types.DecimalFamily: + typ = xTyp + expr = &tree.BinaryExpr{ + Operator: tree.MakeBinaryOperator(tree.Plus), + Left: tree.NewUnresolvedName(string(x.Name)), + Right: RandDatum(rng, xTyp, nullOk), + } + + case types.StringFamily: + typ = types.String + expr = &tree.FuncExpr{ + Func: tree.WrapFunction("lower"), + Exprs: tree.Exprs{tree.NewUnresolvedName(string(x.Name))}, + } + + default: + volatility, ok := tree.LookupCastVolatility(xTyp, types.String, nil /* sessionData */) + if ok && volatility <= tree.VolatilityImmutable { + // We can cast to string; use lower(x::string) + typ = types.String + expr = &tree.FuncExpr{ + Func: tree.WrapFunction("lower"), + Exprs: tree.Exprs{ + &tree.CastExpr{ + Expr: tree.NewUnresolvedName(string(x.Name)), + Type: types.String, + }, + }, + } + } else { + // We cannot cast this type to string in a computed column expression. + // Use CASE WHEN x IS NULL THEN 'foo' ELSE 'bar'. + typ = types.String + expr = &tree.CaseExpr{ + Whens: []*tree.When{ + { + Cond: &tree.IsNullExpr{ + Expr: tree.NewUnresolvedName(string(x.Name)), + }, + Val: RandDatum(rng, types.String, nullOk), + }, + }, + Else: RandDatum(rng, types.String, nullOk), + } + } + } + + return expr, typ, nullability +} diff --git a/pkg/sql/randgen/schema.go b/pkg/sql/randgen/schema.go index ad716c767251..ba7c3ea084a5 100644 --- a/pkg/sql/randgen/schema.go +++ b/pkg/sql/randgen/schema.go @@ -362,115 +362,10 @@ func randComputedColumnTableDef( newDef.Computed.Computed = true newDef.Computed.Virtual = (rng.Intn(2) == 0) - if rng.Intn(2) == 0 { - // Try to find a set of numeric columns with the same type; the computed - // expression will be of the form "a+b+c". - var cols []*tree.ColumnTableDef - var fam types.Family - for _, idx := range rng.Perm(len(normalColDefs)) { - x := normalColDefs[idx] - xFam := x.Type.(*types.T).Family() - - if len(cols) == 0 { - switch xFam { - case types.IntFamily, types.FloatFamily, types.DecimalFamily: - fam = xFam - cols = append(cols, x) - } - } else if fam == xFam { - cols = append(cols, x) - if len(cols) > 1 && rng.Intn(2) == 0 { - break - } - } - } - if len(cols) > 1 { - // If any of the columns are nullable, set the computed column to be - // nullable. - for _, x := range cols { - if x.Nullable.Nullability != tree.NotNull { - newDef.Nullable.Nullability = x.Nullable.Nullability - break - } - } - - var expr tree.Expr - expr = tree.NewUnresolvedName(string(cols[0].Name)) - for _, x := range cols[1:] { - expr = &tree.BinaryExpr{ - Operator: tree.MakeBinaryOperator(tree.Plus), - Left: expr, - Right: tree.NewUnresolvedName(string(x.Name)), - } - } - newDef.Type = cols[0].Type - newDef.Computed.Expr = expr - return newDef - } - } - - // Pick a single column and create a computed column that depends on it. - // The expression is as follows: - // - for numeric types (int, float, decimal), the expression is "x+1"; - // - for string type, the expression is "lower(x)"; - // - for types that can be cast to string in computed columns, the expression - // is "lower(x::string)"; - // - otherwise, the expression is `CASE WHEN x IS NULL THEN 'foo' ELSE 'bar'`. - x := normalColDefs[randutil.RandIntInRange(rng, 0, len(normalColDefs))] - xTyp := x.Type.(*types.T) - - // Match the nullability of the computed column with the nullability of the - // reference column. - newDef.Nullable.Nullability = x.Nullable.Nullability - nullOk := newDef.Nullable.Nullability != tree.NotNull - - switch xTyp.Family() { - case types.IntFamily, types.FloatFamily, types.DecimalFamily: - newDef.Type = xTyp - newDef.Computed.Expr = &tree.BinaryExpr{ - Operator: tree.MakeBinaryOperator(tree.Plus), - Left: tree.NewUnresolvedName(string(x.Name)), - Right: RandDatum(rng, xTyp, nullOk), - } - - case types.StringFamily: - newDef.Type = types.String - newDef.Computed.Expr = &tree.FuncExpr{ - Func: tree.WrapFunction("lower"), - Exprs: tree.Exprs{tree.NewUnresolvedName(string(x.Name))}, - } - - default: - volatility, ok := tree.LookupCastVolatility(xTyp, types.String, nil /* sessionData */) - if ok && volatility <= tree.VolatilityImmutable { - // We can cast to string; use lower(x::string) - newDef.Type = types.String - newDef.Computed.Expr = &tree.FuncExpr{ - Func: tree.WrapFunction("lower"), - Exprs: tree.Exprs{ - &tree.CastExpr{ - Expr: tree.NewUnresolvedName(string(x.Name)), - Type: types.String, - }, - }, - } - } else { - // We cannot cast this type to string in a computed column expression. - // Use CASE WHEN x IS NULL THEN 'foo' ELSE 'bar'. - newDef.Type = types.String - newDef.Computed.Expr = &tree.CaseExpr{ - Whens: []*tree.When{ - { - Cond: &tree.IsNullExpr{ - Expr: tree.NewUnresolvedName(string(x.Name)), - }, - Val: RandDatum(rng, types.String, nullOk), - }, - }, - Else: RandDatum(rng, types.String, nullOk), - } - } - } + expr, typ, nullability := randExpr(rng, normalColDefs) + newDef.Computed.Expr = expr + newDef.Type = typ + newDef.Nullable.Nullability = nullability return newDef } From 4fd177c202531689e65638323e7c59cc27affcc2 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Thu, 12 Aug 2021 11:24:05 -0700 Subject: [PATCH 2/5] randgen: generate random expression indexes The `randgen` package now generates schemas with random expression indexes. This allows for random testing of expression indexes in `sqlsmith` and ternary logic partitioning (TLP). Fixes #68174 Release note: None --- pkg/sql/randgen/expr.go | 4 ++-- pkg/sql/randgen/schema.go | 41 +++++++++++++++++++++++++++++++-------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pkg/sql/randgen/expr.go b/pkg/sql/randgen/expr.go index c3a738fe9491..77c78e42e831 100644 --- a/pkg/sql/randgen/expr.go +++ b/pkg/sql/randgen/expr.go @@ -128,7 +128,7 @@ func randAndOrExpr(rng *rand.Rand, left, right tree.Expr) tree.Expr { // returned nullability is NotNull if all columns referenced in the expression // have a NotNull nullability. func randExpr( - rng *rand.Rand, normalColDefs []*tree.ColumnTableDef, + rng *rand.Rand, normalColDefs []*tree.ColumnTableDef, nullOk bool, ) (tree.Expr, *types.T, tree.Nullability) { nullability := tree.NotNull @@ -189,7 +189,7 @@ func randExpr( // Match the nullability with the nullability of the reference column. nullability = x.Nullable.Nullability - nullOk := nullability != tree.NotNull + nullOk = nullOk && nullability != tree.NotNull var expr tree.Expr var typ *types.T diff --git a/pkg/sql/randgen/schema.go b/pkg/sql/randgen/schema.go index ba7c3ea084a5..368c577632b3 100644 --- a/pkg/sql/randgen/schema.go +++ b/pkg/sql/randgen/schema.go @@ -215,7 +215,7 @@ func RandCreateTableWithInterleave( // Make a random primary key with high likelihood. if rng.Intn(8) != 0 { - indexDef, ok := randIndexTableDefFromCols(rng, columnDefs) + indexDef, ok := randIndexTableDefFromCols(rng, columnDefs, false /* allowExpressions */) if ok && !indexDef.Inverted { defs = append(defs, &tree.UniqueConstraintTableDef{ PrimaryKey: true, @@ -247,7 +247,7 @@ func RandCreateTableWithInterleave( // Make indexes. nIdxs := rng.Intn(10) for i := 0; i < nIdxs; i++ { - indexDef, ok := randIndexTableDefFromCols(rng, columnDefs) + indexDef, ok := randIndexTableDefFromCols(rng, columnDefs, true /* allowExpressions */) if !ok { continue } @@ -362,7 +362,7 @@ func randComputedColumnTableDef( newDef.Computed.Computed = true newDef.Computed.Virtual = (rng.Intn(2) == 0) - expr, typ, nullability := randExpr(rng, normalColDefs) + expr, typ, nullability := randExpr(rng, normalColDefs, true /* nullOk */) newDef.Computed.Expr = expr newDef.Type = typ newDef.Nullable.Nullability = nullability @@ -374,7 +374,7 @@ func randComputedColumnTableDef( // subset of the given columns and a random direction. If unsuccessful, ok=false // is returned. func randIndexTableDefFromCols( - rng *rand.Rand, columnTableDefs []*tree.ColumnTableDef, + rng *rand.Rand, columnTableDefs []*tree.ColumnTableDef, allowExpressions bool, ) (def tree.IndexTableDef, ok bool) { cpy := make([]*tree.ColumnTableDef, len(columnTableDefs)) copy(cpy, columnTableDefs) @@ -386,6 +386,22 @@ func randIndexTableDefFromCols( def.Columns = make(tree.IndexElemList, 0, len(cols)) for i := range cols { semType := tree.MustBeStaticallyKnownType(cols[i].Type) + elem := tree.IndexElem{ + Column: cols[i].Name, + Direction: tree.Direction(rng.Intn(int(tree.Descending) + 1)), + } + + // Replace the column with an expression 10% of the time. + if allowExpressions && rng.Intn(10) == 0 { + var expr tree.Expr + // Expression indexes do not currently support references to + // computed columns, so only make expressions with non-computed + // columns. Do not allow NULL in expressions to avoid expressions + // that have an ambiguous type. + expr, semType, _ = randExpr(rng, nonComputedColumnTableDefs(columnTableDefs), false /* nullOk */) + elem.Expr = expr + elem.Column = "" + } // The non-terminal index columns must be indexable. if isLastCol := i == len(cols)-1; !isLastCol && !colinfo.ColumnTypeIsIndexable(semType) { @@ -398,15 +414,24 @@ func randIndexTableDefFromCols( def.Inverted = true } - def.Columns = append(def.Columns, tree.IndexElem{ - Column: cols[i].Name, - Direction: tree.Direction(rng.Intn(int(tree.Descending) + 1)), - }) + def.Columns = append(def.Columns, elem) } return def, true } +// nonComputedColumnTableDefs returns a slice containing all the columns in cols +// that are not computed columns. +func nonComputedColumnTableDefs(cols []*tree.ColumnTableDef) []*tree.ColumnTableDef { + nonComputedCols := make([]*tree.ColumnTableDef, 0, len(cols)) + for _, col := range cols { + if !col.Computed.Computed { + nonComputedCols = append(nonComputedCols, col) + } + } + return nonComputedCols +} + // TestingMakePrimaryIndexKey creates a key prefix that corresponds to // a table row (in the primary index); it is intended for tests. // From c987fc9cf4db2f849da3c5e83cb7f5c70fd4fdb7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 16 Aug 2021 07:59:08 -0700 Subject: [PATCH 3/5] roachtest/tests: adjust sqlsmith slightly This commit adjusts `sqlsmith` roachtest slightly so that vectorized panic injection occurs with 50% probability (instead of 100%). This is done to check whether the panic injection is the root cause of the inbox communication errors we have been seeing sporadically. Release note: None --- pkg/cmd/roachtest/tests/sqlsmith.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/cmd/roachtest/tests/sqlsmith.go b/pkg/cmd/roachtest/tests/sqlsmith.go index 261bd3140bca..33cb5e174e37 100644 --- a/pkg/cmd/roachtest/tests/sqlsmith.go +++ b/pkg/cmd/roachtest/tests/sqlsmith.go @@ -132,11 +132,17 @@ func registerSQLSmith(r registry.Registry) { // other setup queries have already completed, including the smither // instantiation (otherwise, the setup might fail because of the // injected panics). - injectPanicsStmt := "SET testing_vectorize_inject_panics=true;" - if _, err := conn.Exec(injectPanicsStmt); err != nil { - t.Fatal(err) + if rng.Float64() < 0.5 { + // TODO(yuzefovich): at the moment we're only injecting panics with + // 50% probability in order to test the hypothesis that this panic + // injection is the root cause of the inbox communication errors we + // have been seeing sporadically. + injectPanicsStmt := "SET testing_vectorize_inject_panics=true;" + if _, err := conn.Exec(injectPanicsStmt); err != nil { + t.Fatal(err) + } + logStmt(injectPanicsStmt) } - logStmt(injectPanicsStmt) t.Status("smithing") until := time.After(t.Spec().(*registry.TestSpec).Timeout / 2) From 749c8ee1c517f132904560fd596270eafaf5a3b7 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 13 Aug 2021 17:53:40 -0400 Subject: [PATCH 4/5] Revert "streamingccl: hang processors on losing connection with sinkless stream client" This reverts commit f5244f499144c7dbfb34849e18fc8a69fb3783dc. --- pkg/ccl/streamingccl/event.go | 5 - .../cockroach_sinkless_replication_client.go | 11 +- ...kroach_sinkless_replication_client_test.go | 13 -- .../stream_ingestion_processor.go | 55 ++----- .../stream_ingestion_processor_test.go | 153 ++---------------- .../streamingtest/replication_helpers.go | 14 -- 6 files changed, 30 insertions(+), 221 deletions(-) diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index 77f4800f5399..ba0b0b19cebc 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -116,8 +116,3 @@ func MakeKVEvent(kv roachpb.KeyValue) Event { func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event { return checkpointEvent{resolvedTimestamp: resolvedTimestamp} } - -// MakeGenerationEvent creates an GenerationEvent. -func MakeGenerationEvent() Event { - return generationEvent{} -} diff --git a/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go b/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go index 23bfb5c36c47..6f512cd09bf0 100644 --- a/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go +++ b/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go @@ -11,7 +11,6 @@ package streamclient import ( "context" gosql "database/sql" - "database/sql/driver" "fmt" "strconv" @@ -125,15 +124,7 @@ func (m *sinklessReplicationClient) ConsumePartition( } } if err := rows.Err(); err != nil { - if errors.Is(err, driver.ErrBadConn) { - select { - case eventCh <- streamingccl.MakeGenerationEvent(): - case <-ctx.Done(): - errCh <- ctx.Err() - } - } else { - errCh <- err - } + errCh <- err return } }() diff --git a/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client_test.go b/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client_test.go index 324fefd75486..af7aa96b6530 100644 --- a/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client_test.go @@ -112,17 +112,4 @@ INSERT INTO d.t2 VALUES (2); feed.ObserveResolved(secondObserved.Value.Timestamp) cancelIngestion() }) - - t.Run("stream-address-disconnects", func(t *testing.T) { - clientCtx, cancelIngestion := context.WithCancel(ctx) - eventCh, errCh, err := client.ConsumePartition(clientCtx, pa, startTime) - require.NoError(t, err) - feedSource := &channelFeedSource{eventCh: eventCh, errCh: errCh} - feed := streamingtest.MakeReplicationFeed(t, feedSource) - - h.SysServer.Stopper().Stop(clientCtx) - - require.True(t, feed.ObserveGeneration()) - cancelIngestion() - }) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 419e46851415..b29fc2a4601e 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "golang.org/x/sync/errgroup" @@ -96,6 +95,14 @@ type streamIngestionProcessor struct { // and have attempted to flush them with `internalDrained`. internalDrained bool + // ingestionErr stores any error that is returned from the worker goroutine so + // that it can be forwarded through the DistSQL flow. + ingestionErr error + + // pollingErr stores any error that is returned from the poller checking for a + // cutover signal so that it can be forwarded through the DistSQL flow. + pollingErr error + // pollingWaitGroup registers the polling goroutine and waits for it to return // when the processor is being drained. pollingWaitGroup sync.WaitGroup @@ -110,20 +117,6 @@ type streamIngestionProcessor struct { // closePoller is used to shutdown the poller that checks the job for a // cutover signal. closePoller chan struct{} - - // mu is used to provide thread-safe read-write operations to ingestionErr - // and pollingErr. - mu struct { - syncutil.Mutex - - // ingestionErr stores any error that is returned from the worker goroutine so - // that it can be forwarded through the DistSQL flow. - ingestionErr error - - // pollingErr stores any error that is returned from the poller checking for a - // cutover signal so that it can be forwarded through the DistSQL flow. - pollingErr error - } } // partitionEvent augments a normal event with the partition it came from. @@ -197,9 +190,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { defer sip.pollingWaitGroup.Done() err := sip.checkForCutoverSignal(ctx, sip.closePoller) if err != nil { - sip.mu.Lock() - sip.mu.pollingErr = errors.Wrap(err, "error while polling job for cutover signal") - sip.mu.Unlock() + sip.pollingErr = errors.Wrap(err, "error while polling job for cutover signal") } }() @@ -229,11 +220,8 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr return nil, sip.DrainHelper() } - sip.mu.Lock() - err := sip.mu.pollingErr - sip.mu.Unlock() - if err != nil { - sip.MoveToDraining(err) + if sip.pollingErr != nil { + sip.MoveToDraining(sip.pollingErr) return nil, sip.DrainHelper() } @@ -255,11 +243,8 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr return row, nil } - sip.mu.Lock() - err = sip.mu.ingestionErr - sip.mu.Unlock() - if err != nil { - sip.MoveToDraining(err) + if sip.ingestionErr != nil { + sip.MoveToDraining(sip.ingestionErr) return nil, sip.DrainHelper() } @@ -387,10 +372,7 @@ func (sip *streamIngestionProcessor) merge( }) } go func() { - err := g.Wait() - sip.mu.Lock() - defer sip.mu.Unlock() - sip.mu.ingestionErr = err + sip.ingestionErr = g.Wait() close(merged) }() @@ -444,15 +426,6 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err } return sip.flush() - case streamingccl.GenerationEvent: - log.Info(sip.Ctx, "GenerationEvent received") - select { - case <-sip.cutoverCh: - sip.internalDrained = true - return nil, nil - case <-sip.Ctx.Done(): - return nil, sip.Ctx.Err() - } default: return nil, errors.Newf("unknown streaming event type %v", event.Type()) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index f0b7b2c3fc2b..ea886a64b8c7 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -12,7 +12,6 @@ import ( "context" "fmt" "strconv" - "sync" "testing" "time" @@ -49,20 +48,9 @@ import ( // partition addresses. type mockStreamClient struct { partitionEvents map[streamingccl.PartitionAddress][]streamingccl.Event - - // mu is used to provide a threadsafe interface to interceptors. - mu struct { - syncutil.Mutex - - // interceptors can be registered to peek at every event generated by this - // client. - interceptors []func(streamingccl.Event, streamingccl.PartitionAddress) - tableID int - } } var _ streamclient.Client = &mockStreamClient{} -var _ streamclient.InterceptableStreamClient = &mockStreamClient{} // GetTopology implements the Client interface. func (m *mockStreamClient) GetTopology( @@ -73,7 +61,7 @@ func (m *mockStreamClient) GetTopology( // ConsumePartition implements the Client interface. func (m *mockStreamClient) ConsumePartition( - ctx context.Context, address streamingccl.PartitionAddress, _ hlc.Timestamp, + _ context.Context, address streamingccl.PartitionAddress, _ hlc.Timestamp, ) (chan streamingccl.Event, chan error, error) { var events []streamingccl.Event var ok bool @@ -81,43 +69,14 @@ func (m *mockStreamClient) ConsumePartition( return nil, nil, errors.Newf("no events found for paritition %s", address) } - eventCh := make(chan streamingccl.Event) - errCh := make(chan error) - - go func() { - defer close(eventCh) - defer close(errCh) - - for _, event := range events { - select { - case eventCh <- event: - case <-ctx.Done(): - errCh <- ctx.Err() - } - - func() { - m.mu.Lock() - defer m.mu.Unlock() - - if len(m.mu.interceptors) > 0 { - for _, interceptor := range m.mu.interceptors { - if interceptor != nil { - interceptor(event, address) - } - } - } - }() - } - }() + eventCh := make(chan streamingccl.Event, len(events)) - return eventCh, errCh, nil -} + for _, event := range events { + eventCh <- event + } + close(eventCh) -// RegisterInterception implements the InterceptableStreamClient interface. -func (m *mockStreamClient) RegisterInterception(fn streamclient.InterceptFn) { - m.mu.Lock() - defer m.mu.Unlock() - m.mu.interceptors = append(m.mu.interceptors, fn) + return eventCh, nil, nil } // errorStreamClient always returns an error when consuming a partition. @@ -212,59 +171,6 @@ func TestStreamIngestionProcessor(t *testing.T) { require.Nil(t, row) testutils.IsError(meta.Err, "this client always returns an error") }) - - t.Run("stream ingestion processor shuts down gracefully on losing client connection", func(t *testing.T) { - events := []streamingccl.Event{streamingccl.MakeGenerationEvent()} - pa := streamingccl.PartitionAddress("partition") - mockClient := &mockStreamClient{ - partitionEvents: map[streamingccl.PartitionAddress][]streamingccl.Event{pa: events}, - } - - startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - partitionAddresses := []streamingccl.PartitionAddress{"partition"} - - interceptCh := make(chan struct{}) - defer close(interceptCh) - sendToInterceptCh := func() { - interceptCh <- struct{}{} - } - interceptGeneration := markGenerationEventReceived(sendToInterceptCh) - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/", - partitionAddresses, startTime, []streamclient.InterceptFn{interceptGeneration}, mockClient) - require.NoError(t, err) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - sip.Run(ctx) - }() - - // The channel will block on read if the event has not been intercepted yet. - // Once it unblocks, we are guaranteed that the mockClient has sent the - // GenerationEvent and the processor has read it. - <-interceptCh - - // The sip processor has received a GenerationEvent and is thus - // waiting for a cutover signal, so let's send one! - sip.cutoverCh <- struct{}{} - - wg.Wait() - // Ensure that all the outputs are properly closed. - if !out.ProducerClosed() { - t.Fatalf("output RowReceiver not closed") - } - - for { - // No metadata should have been produced since the processor - // should have been moved to draining state with a nil error. - row := out.NextNoMeta(t) - if row == nil { - break - } - t.Fatalf("more output rows than expected") - } - }) } func getPartitionSpanToTableID( @@ -473,30 +379,6 @@ func runStreamIngestionProcessor( interceptEvents []streamclient.InterceptFn, mockClient streamclient.Client, ) (*distsqlutils.RowBuffer, error) { - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, - partitionAddresses, startTime, interceptEvents, mockClient) - require.NoError(t, err) - - sip.Run(ctx) - - // Ensure that all the outputs are properly closed. - if !out.ProducerClosed() { - t.Fatalf("output RowReceiver not closed") - } - return out, err -} - -func getStreamIngestionProcessor( - ctx context.Context, - t *testing.T, - registry *jobs.Registry, - kvDB *kv.DB, - streamAddr string, - partitionAddresses []streamingccl.PartitionAddress, - startTime hlc.Timestamp, - interceptEvents []streamclient.InterceptFn, - mockClient streamclient.Client, -) (*streamIngestionProcessor, *distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() evalCtx := tree.MakeTestingEvalContext(st) @@ -541,7 +423,14 @@ func getStreamIngestionProcessor( interceptable.RegisterInterception(interceptor) } } - return sip, out, err + + sip.Run(ctx) + + // Ensure that all the outputs are properly closed. + if !out.ProducerClosed() { + t.Fatalf("output RowReceiver not closed") + } + return out, err } func registerValidatorWithClient( @@ -587,15 +476,3 @@ func makeCheckpointEventCounter( } } } - -// markGenerationEventReceived runs f after seeing a GenerationEvent. -func markGenerationEventReceived( - f func(), -) func(event streamingccl.Event, pa streamingccl.PartitionAddress) { - return func(event streamingccl.Event, pa streamingccl.PartitionAddress) { - switch event.Type() { - case streamingccl.GenerationEvent: - f() - } - } -} diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go index 8dcb6960944f..a221b9be4dba 100644 --- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go +++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go @@ -53,14 +53,6 @@ func ResolvedAtLeast(lo hlc.Timestamp) FeedPredicate { } } -// ReceivedNewGeneration makes a FeedPredicate that matches when a GenerationEvent has -// been received. -func ReceivedNewGeneration() FeedPredicate { - return func(msg streamingccl.Event) bool { - return msg.Type() == streamingccl.GenerationEvent - } -} - // FeedSource is a source of events for a ReplicationFeed. type FeedSource interface { // Next returns the next event, and a flag indicating if there are more events @@ -100,12 +92,6 @@ func (rf *ReplicationFeed) ObserveResolved(lo hlc.Timestamp) hlc.Timestamp { return *rf.msg.GetResolved() } -// ObserveGeneration consumes the feed until we received a GenerationEvent. Returns true. -func (rf *ReplicationFeed) ObserveGeneration() bool { - require.NoError(rf.t, rf.consumeUntil(ReceivedNewGeneration())) - return true -} - // Close cleans up any resources. func (rf *ReplicationFeed) Close() { rf.f.Close() From a0e8f2c883138ee6e3aa3eb72ac98dc1f65de235 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Mon, 16 Aug 2021 14:27:54 -0400 Subject: [PATCH 5/5] backupccl: skip TestBackupRestoreSystemJobProgress under stressrace The test times out under stressrace. Release note: None --- pkg/ccl/backupccl/backup_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 9b3aacabcb46..3e458b1a6391 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1632,6 +1632,8 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) { defer log.Scope(t).Close(t) defer jobs.TestingSetProgressThresholds()() + skip.UnderStressRace(t, "test takes too long to run under stressrace") + checkFraction := func(ctx context.Context, ip inProgressState) error { jobID, err := ip.latestJobID() if err != nil {