diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index cd8c62e5885b..3d715a060df0 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -13,6 +13,7 @@ ALL_TESTS = [ "//pkg/ccl/backupccl:backupccl_test", "//pkg/ccl/baseccl:baseccl_test", "//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test", + "//pkg/ccl/changefeedccl/cdceval:cdceval_test", "//pkg/ccl/changefeedccl/cdcevent:cdcevent_test", "//pkg/ccl/changefeedccl/cdctest:cdctest_test", "//pkg/ccl/changefeedccl/cdcutils:cdcutils_test", diff --git a/pkg/ccl/changefeedccl/avro_test.go b/pkg/ccl/changefeedccl/avro_test.go index c25c4a54cb21..953d992d62c6 100644 --- a/pkg/ccl/changefeedccl/avro_test.go +++ b/pkg/ccl/changefeedccl/avro_test.go @@ -165,7 +165,7 @@ func parseAvroSchema(t *testing.T, j string) (*avroDataRecord, error) { } return tableToAvroSchema( cdcevent.TestingMakeEventRow( - tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), nil, false, + tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), 0, nil, false, ), "", string(changefeedbase.OptVirtualColumnsOmitted)) } @@ -390,7 +390,7 @@ func TestAvroSchema(t *testing.T) { fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.schema)) require.NoError(t, err) origSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(tableDesc, nil, false), + cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) jsonSchema := origSchema.codec.Schema() @@ -404,7 +404,7 @@ func TestAvroSchema(t *testing.T) { require.NoError(t, err) for _, encDatums := range rows { - row := cdcevent.TestingMakeEventRow(tableDesc, encDatums, false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums, false) evalCtx := &eval.Context{ SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}), } @@ -429,7 +429,7 @@ func TestAvroSchema(t *testing.T) { tableDesc, err := parseTableDesc(`CREATE TABLE "☃" (🍦 INT PRIMARY KEY)`) require.NoError(t, err) tableSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(tableDesc, nil, false), avroSchemaNoSuffix, "") + cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) require.Equal(t, `{"type":"record","name":"_u2603_","fields":[`+ @@ -437,7 +437,7 @@ func TestAvroSchema(t *testing.T) { `"__crdb__":"🍦 INT8 NOT NULL"}]}`, tableSchema.codec.Schema()) indexSchema, err := primaryIndexToAvroSchema( - cdcevent.TestingMakeEventRow(tableDesc, nil, false), tableDesc.GetName(), "") + cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), tableDesc.GetName(), "") require.NoError(t, err) require.Equal(t, `{"type":"record","name":"_u2603_","fields":[`+ @@ -660,7 +660,7 @@ func TestAvroSchema(t *testing.T) { encDatums, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`) require.NoError(t, err) - row := cdcevent.TestingMakeEventRow(tableDesc, encDatums[0], false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums[0], false) schema, err := tableToAvroSchema( row, avroSchemaNoSuffix, "") require.NoError(t, err) @@ -718,7 +718,7 @@ func TestAvroSchema(t *testing.T) { encDatums, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`) require.NoError(t, err) - row := cdcevent.TestingMakeEventRow(tableDesc, encDatums[0], false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums[0], false) schema, err := tableToAvroSchema(row, avroSchemaNoSuffix, "") require.NoError(t, err) textual, err := schema.textualFromRow(row) @@ -823,13 +823,13 @@ func TestAvroMigration(t *testing.T) { fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.writerSchema)) require.NoError(t, err) writerSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(writerDesc, nil, false), avroSchemaNoSuffix, "") + cdcevent.TestingMakeEventRow(writerDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) readerDesc, err := parseTableDesc( fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.readerSchema)) require.NoError(t, err) readerSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(readerDesc, nil, false), avroSchemaNoSuffix, "") + cdcevent.TestingMakeEventRow(readerDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) writerRows, err := parseValues(writerDesc, `VALUES `+test.writerValues) @@ -838,7 +838,7 @@ func TestAvroMigration(t *testing.T) { require.NoError(t, err) for i := range writerRows { - writerEvent := cdcevent.TestingMakeEventRow(writerDesc, writerRows[i], false) + writerEvent := cdcevent.TestingMakeEventRow(writerDesc, 0, writerRows[i], false) encoded, err := writerSchema.BinaryFromRow(nil, writerEvent.ForEachColumn()) require.NoError(t, err) row, err := rowFromBinaryEvolved(encoded, writerSchema, readerSchema) @@ -906,7 +906,7 @@ func benchmarkEncodeType(b *testing.B, typ *types.T, encRow rowenc.EncDatumRow) tableDesc, err := parseTableDesc( fmt.Sprintf(`CREATE TABLE bench_table (bench_field %s)`, typ.SQLString())) require.NoError(b, err) - row := cdcevent.TestingMakeEventRow(tableDesc, encRow, false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encRow, false) schema, err := tableToAvroSchema(row, "suffix", "namespace") require.NoError(b, err) diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel new file mode 100644 index 000000000000..6f7308153aa9 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -0,0 +1,71 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "cdceval", + srcs = [ + "doc.go", + "expr_eval.go", + "functions.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/sql/catalog/colinfo", + "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/builtins", + "//pkg/sql/sem/eval", + "//pkg/sql/sem/normalize", + "//pkg/sql/sem/tree", + "//pkg/sql/sem/volatility", + "//pkg/sql/sessiondata", + "//pkg/sql/sessiondatapb", + "//pkg/sql/types", + "//pkg/util/hlc", + "//pkg/util/json", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "cdceval_test", + srcs = [ + "expr_eval_test.go", + "functions_test.go", + "main_test.go", + ], + embed = [":cdceval"], + deps = [ + "//pkg/base", + "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/ccl/changefeedccl/cdctest", + "//pkg/ccl/utilccl", + "//pkg/jobs/jobspb", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/distsql", + "//pkg/sql/parser", + "//pkg/sql/randgen", + "//pkg/sql/rowenc", + "//pkg/sql/sem/eval", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondatapb", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/json", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/changefeedccl/cdceval/doc.go b/pkg/ccl/changefeedccl/cdceval/doc.go new file mode 100644 index 000000000000..cca6e6ea7b74 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/doc.go @@ -0,0 +1,107 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +/*** + +cdceval package is a library for evaluating various expressions in CDC. +Namely, this package concerns itself with 3 things: + * Filter evaluation -- aka predicates: does the event match boolean expression. + * Projection evaluation: given the set of projection expressions, evaluate them. + * (Soon to be added) Evaluation of computed virtual columns. + +Evaluator is the gateway into the evaluation logic; it has 3 methods matching +the above use cases. Before filtering and projection can be used, Evaluator must +be configured with appropriate predicate and filtering expressions via ConfigureProjection. + +If the Evaluator is not configured with ConfigureProjection, then each event is assumed +to match filter by default, and projection operation is an identity operation returning input +row. + +Evaluator constructs a helper structure (exprEval) to perform actual evaluation. +One exprEval exists per cdcevent.EventDescriptor (currently, a new exprEval created +whenever event descriptor changes; we might have to add some sort of caching if needed). + +Evaluation of projections and filter expressions are identical. + +First, we have "compilation" phase: + 1. Expression is "walked" to resolve the names and replace those names with tree.IndexedVar expressions. + The "index" part of the indexed var refers to the "index" of the datum in the row. + (note however: the row is abstracted under cdcevent package). IndexedVar allows the value of that + variable to be bound later once it is known; it also associates the type information + with that variable. + 2. Expression is typed check to ensure that it is of the appropriate type. + * Projection expressions can be of tree.Any type, while filters must be tree.DBool. + 3. Expression is then normalized (constants folded, etc). + +It is an error to have a filter expression which evaluates to "false" -- in this case, Evaluator +will return a "contradiction" error. + +After expressions "compiled", they can be evaluated; and again, both projections and filters use the same +logic (evalExpr() function); basically, all IndexedVars are bound to the Datums in the updated row, and the +expression is evaluated to the appropriate target type. + +Expressions can contain functions. We restrict the set of functions that can be used by CDC. +Volatile functions, window functions, aggregate functions are disallowed. +Certain stable functions (s.a. now(), current_timestamp(), etc) are allowed -- they will always +return the MVCC timestamp of the event. +We also provide custom, CDC specific functions, such as cdc_prev() which returns prevoius row as +a JSONB record. See functions.go for more details. + +***/ + +// TODO(yevgeniy): Various notes/questions/issues and todos. +// 1. Options issues: +// * key_in_value: makes no sense; just "select *" +// * key_only: currently unsupported by this flavor; would be nice to support it though +// i.e. you only want the key, but you need "where" clause filtering. Not clear how to express in sql.y +// * VirtualColumnVisibility: null or omit -- both can be accomplished +// * null: currently emitting null, but you can choose to emit null via "select ... null as vcolumn" +// * omit: well, just don't select. +// * On the other hand, we can also support "emit" mode, where we can compute vcolumn expression. +// * updated and mvcc_timestamp options -- both supported via select +// * Wrapped option -- does it make sense here. +// 3. Probably need to add more custom functions. +// * Determine what to do with stable function overrides (now()) vs cdc_mvcc_timestamp. Keep both? drop one? +// 4. How to surface previous row -- it's an open question. +// * Option 1: provide cdc_prev() builtin which returns JSON encoding of previous row. +// One of the negatives is that we are adding an additional JSONB encoding cost, though, this may not +// be that horrible. One interesting thing we could do with this approach is to also have a function +// cdc_delta which reduces JSONB to contain only modified columns (cdc_delta(cdc_prev()). +// Of course you could do something like this with "prev" table, but you'd have to "(case ...)" select +// for each table column. +// And since composition is so cool, you could use cdc_delta to determine if an update is not actually +// and update, but an upsert event. +// * Option 2: provide "prev" table. Also has negatives. Name resolution becomes more complex. You could +// legitimately have "prev" table, so you'd always need to alias the "real prev" table. The prev table +// is not specified via sql.y, so that's confusing. +// * Regardless of the option, keep in mind that sometimes prev is not set -- e.g. w/out diff option +// (here, we can return an error), but also not set during initial scan. So, the query must handle +// nulls in prev value. Just something to keep in mind. +// 5. We must be able to return permanent errors from this code that cause changefeed to fail. +// If filtering fails for a row (e.g. "select ... where col_a/col_b > 0" results in divide by 0), +// this will fail forever, and so we must be able to return permanent error. +// 6. Related to 5, we must have poison message handling so we won't kill feed in cases like that. +// 7. Schema changes could cause permanent failures. +// 8. Multiple *'s are allowed. But should they? +// 9. It is interesting to consider what access to prev does when we then send that data to encoder. +// Right now, we hard code before/after datums; with predicates, we should probably change how things are encoded. +// I.e. no "before"/"after" fields in json/avro -- instead, you select what you want to select. +// 10. Multi family support -- sort of breaks down because you get datums only for 1 family at a time. Any expressions +// comparing columns across families will fail. +// 11. Span constraints -- arguably the "holy grail" -- something that depends on the optiizer, but perhaps we +// can find a way of using that w/out significant refactor to expose entirety of changefeed to opt. +// Basically, given the set of predicates over primary key span, try to narrow the span(s) to those that can +// satisfy predicates. +// 12. UI/Usability: Simple contradictions are detected -- but not all. Even w/out contradiction, the user +// may want to know which events match/not match, and how does the data look like. We might need a mode +// where the data always emitted, but it is marked somehow, indicating how the data will be handled. +// 13. We should walk expressions to determine if we need to turn on an option. E.g. if we know user wants to filter +// out deletes, we could push this predicate down to KV (once kv supports filtering). +// Another idea is potentially detect if cdc_prev() is used and if so, turn on with diff option. diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go new file mode 100644 index 000000000000..4cad87b03a26 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -0,0 +1,674 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/normalize" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// Evaluator is a responsible for evaluating expressions in CDC. +type Evaluator struct { + selectors []tree.SelectExpr + where tree.Expr + + evalCtx *eval.Context + // Current evaluator. Re-initialized whenever event descriptor + // version changes. + evaluator *exprEval +} + +// NewEvaluator returns new evaluator instance. +func NewEvaluator(evalCtx *eval.Context) Evaluator { + return Evaluator{evalCtx: evalCtx.Copy()} +} + +// ConfigureProjection configures this evaluator to evaluate projection +func (e *Evaluator) ConfigureProjection(exprs tree.SelectExprs) error { + if len(exprs) == 0 { + return pgerror.New(pgcode.InvalidParameterValue, "expected at least 1 projection") + } + e.selectors = exprs + for _, se := range e.selectors { + expr, err := validateExpressionForCDC(se.Expr) + if err != nil { + return err + } + se.Expr = expr + } + return nil +} + +// ConfigureFilter configures this evaluator to match rows against filter expression. +func (e *Evaluator) ConfigureFilter(filter tree.Expr) error { + if filter == nil { + return nil + } + expr, err := validateExpressionForCDC(filter) + if err != nil { + return err + } + e.where = expr + return nil +} + +// ComputeVirtualColumns updates row with computed values for all virtual columns. +func (e *Evaluator) ComputeVirtualColumns(ctx context.Context, row *cdcevent.Row) error { + return errors.AssertionFailedf("unimplemented yet") +} + +// MatchesFilter returns true if row matches evaluator filter expression. +func (e *Evaluator) MatchesFilter( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (bool, error) { + if e.where == nil { + return true, nil + } + + if err := e.initEval(ctx, updatedRow.EventDescriptor); err != nil { + return false, err + } + + return e.evaluator.matchesFilter(ctx, updatedRow, mvccTS, prevRow) +} + +// Projection performs evalProjection operation on the updated row. +// mvccTS is an mvcc timestamp of updated row, and prevRow may optionally contain +// the value of the previous row. +// Returns cdcevent.Row representing evalProjection. +func (e *Evaluator) Projection( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (cdcevent.Row, error) { + if len(e.selectors) == 0 { + return updatedRow, nil + } + + if err := e.initEval(ctx, updatedRow.EventDescriptor); err != nil { + return cdcevent.Row{}, err + } + + return e.evaluator.evalProjection(ctx, updatedRow, mvccTS, prevRow) +} + +// initEval initializes evaluator for the specified event descriptor. +func (e *Evaluator) initEval(ctx context.Context, d *cdcevent.EventDescriptor) error { + if e.evaluator != nil && d.Equals(e.evaluator.EventDescriptor) { + return nil + } + + evaluator := newExprEval(e.evalCtx, d) + for _, selector := range e.selectors { + if err := evaluator.addSelector(ctx, selector, len(e.selectors)); err != nil { + return err + } + } + + if err := evaluator.addFilter(ctx, e.where); err != nil { + return err + } + + e.evaluator = evaluator + return nil +} + +type exprEval struct { + *cdcevent.EventDescriptor + semaCtx tree.SemaContext + evalCtx *eval.Context + + evalHelper *rowContainer // evalHelper is a container tree.IndexedVarContainer. + iVarHelper tree.IndexedVarHelper // iVarHelper helps create indexed variables bound to evalHelper. + resolver cdcNameResolver // resolver responsible for performing function name resolution. + + starProjection bool + selectors []tree.TypedExpr // set of expressions to evaluate when performing evalProjection. + projection cdcevent.Projection + filter tree.TypedExpr // where clause filter + + // keep track of number of times particular column name was used + // in selectors. Since the data produced by CDC gets converted + // to the formats (JSON, avro, etc.) that may not like having multiple + // fields named the same way, this map helps us unique-ify those columns. + nameUseCount map[string]int + + // rowEvalCtx contains state necessary to evaluate expressions. + // updated for each row. + rowEvalCtx rowEvalContext +} + +func newExprEval(evalCtx *eval.Context, ed *cdcevent.EventDescriptor) *exprEval { + cols := ed.ResultColumns() + e := &exprEval{ + EventDescriptor: ed, + semaCtx: tree.MakeSemaContext(), + evalCtx: evalCtx.Copy(), + evalHelper: &rowContainer{cols: cols}, + projection: cdcevent.MakeProjection(ed), + nameUseCount: make(map[string]int), + } + + evalCtx = nil // From this point, only e.evalCtx should be used. + + // Configure semantic context. + e.semaCtx.SearchPath = &cdcCustomFunctionResolver{SearchPath: sessiondata.DefaultSearchPath} + e.semaCtx.Properties.Require("cdc", + tree.RejectAggregates|tree.RejectGenerators|tree.RejectWindowApplications|tree.RejectNestedGenerators, + ) + e.semaCtx.Annotations = tree.MakeAnnotations(cdcAnnotationAddr) + e.semaCtx.IVarContainer = e.evalHelper + + // Configure evaluation context. + e.evalCtx.Annotations = &e.semaCtx.Annotations + e.evalCtx.Annotations.Set(cdcAnnotationAddr, &e.rowEvalCtx) + e.evalCtx.IVarContainer = e.evalHelper + + // Extract colinfo.ResultColumn from cdcevent.ResultColumn + nakedResultColumns := func() (rc []colinfo.ResultColumn) { + rc = make([]colinfo.ResultColumn, len(cols)) + for i := 0; i < len(cols); i++ { + rc[i] = cols[i].ResultColumn + } + return rc + } + + e.iVarHelper = tree.MakeIndexedVarHelper(e.evalHelper, len(cols)) + e.resolver = cdcNameResolver{ + EventDescriptor: ed, + NameResolutionVisitor: schemaexpr.MakeNameResolutionVisitor( + colinfo.NewSourceInfoForSingleTable( + tree.MakeUnqualifiedTableName(tree.Name(ed.TableName)), + nakedResultColumns(), + ), + e.iVarHelper, + ), + } + + return e +} + +// rowEvalContext represents the context needed to evaluate row expressions. +type rowEvalContext struct { + mvccTS hlc.Timestamp + updatedRow cdcevent.Row + prevRow cdcevent.Row + memo struct { + prevJSON tree.Datum + } +} + +// setupContext configures evaluation context with the provided row information. +func (e *exprEval) setupContext( + updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) { + e.rowEvalCtx.updatedRow = updatedRow + e.rowEvalCtx.prevRow = prevRow + e.rowEvalCtx.mvccTS = mvccTS + e.evalCtx.TxnTimestamp = mvccTS.GoTime() + + // Clear out all memo records + e.rowEvalCtx.memo.prevJSON = nil +} + +// evalProjection responsible for evaluating projection expression. +// Returns new projection Row. +func (e *exprEval) evalProjection( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (cdcevent.Row, error) { + if e.starProjection { + return updatedRow, nil + } + + e.setupContext(updatedRow, mvccTS, prevRow) + + for i, expr := range e.selectors { + d, err := e.evalExpr(ctx, expr, types.Any) + if err != nil { + return cdcevent.Row{}, err + } + if err := e.projection.SetValueDatumAt(i, d); err != nil { + return cdcevent.Row{}, err + } + } + + return e.projection.Project(updatedRow) +} + +// matchesFilter returns true if row matches configured filter. +func (e *exprEval) matchesFilter( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (bool, error) { + if e.filter == nil { + return true, nil + } + + e.setupContext(updatedRow, mvccTS, prevRow) + d, err := e.evalExpr(ctx, e.filter, types.Bool) + if err != nil { + return false, err + } + return d == tree.DBoolTrue, nil +} + +// computeRenderColumnName returns render name for a selector, adjusted for CDC use case. +func (e *exprEval) computeRenderColumnName(selector tree.SelectExpr) (string, error) { + as, err := func() (string, error) { + if selector.As != "" { + return string(selector.As), nil + } + // We use ComputeColNameInternal instead of GetRenderName because the latter, if it can't + // figure out the name, returns "?column?" as the name; but we want to name things slightly + // different in that case. + _, s, err := tree.ComputeColNameInternal(e.semaCtx.SearchPath, selector.Expr) + return s, err + }() + if err != nil { + return "", err + } + + if as == "" { + as = fmt.Sprintf("column_%d", 1+len(e.selectors)) + } + return e.makeUniqueName(as), nil +} + +// makeUniqueName returns a unique name for the specified name. +// We do this because seeing same named fields in JSON might be confusing. +func (e *exprEval) makeUniqueName(as string) string { + useCount := e.nameUseCount[as] + e.nameUseCount[as]++ + if useCount > 0 { + // Unique-ify evalProjection name. + as = fmt.Sprintf("%s_%d", as, useCount) + } + return as +} + +// addSelector adds specified select expression to evalProjection set. +func (e *exprEval) addSelector( + ctx context.Context, selector tree.SelectExpr, numSelectors int, +) error { + as, err := e.computeRenderColumnName(selector) + if err != nil { + return err + } + + typedExpr, err := e.typeCheck(ctx, selector.Expr, types.Any) + if err != nil { + return err + } + + // Expand "*". We walked expression during type check above, so we only expect to + // see UnqualifiedStar. + if _, isStar := typedExpr.(tree.UnqualifiedStar); isStar { + if numSelectors == 1 { + // Single star gets special treatment. + e.starProjection = true + } else { + for ord, col := range e.ResultColumns() { + e.addProjection(e.iVarHelper.IndexedVar(ord), e.makeUniqueName(col.Name)) + } + } + } else { + e.addProjection(typedExpr, as) + } + + return nil +} + +// addFilter adds where clause filter. +func (e *exprEval) addFilter(ctx context.Context, where tree.Expr) error { + if where == nil { + return nil + } + typedExpr, err := e.typeCheck(ctx, where, types.Bool) + if err != nil { + return err + } + + if typedExpr == tree.DBoolTrue { + if log.V(1) { + log.Infof(ctx, "ignoring tautological filter %q", where) + } + return nil + } + + if typedExpr == tree.DBoolFalse { + return errors.Newf("filter %q is a contradiction", where) + } + + e.filter = typedExpr + return nil +} + +// addProjection adds expression to be returned by evalProjection. +func (e *exprEval) addProjection(expr tree.TypedExpr, as string) { + e.selectors = append(e.selectors, expr) + e.projection.AddValueColumn(as, expr.ResolvedType()) +} + +// typeCheck converts expression to the expression of specified target type. +func (e *exprEval) typeCheck( + ctx context.Context, expr tree.Expr, targetType *types.T, +) (tree.TypedExpr, error) { + // If we have variable free immutable expressions, then we can just evaluate it right away. + typedExpr, err := schemaexpr.SanitizeVarFreeExpr( + ctx, expr, targetType, "cdc", &e.semaCtx, volatility.Immutable) + if err == nil { + d, err := eval.Expr(e.evalCtx, typedExpr) + if err != nil { + return nil, err + } + return d, nil + } + + // We must work harder. Bind variables and resolve names. + expr, _ = tree.WalkExpr(&e.resolver, expr) + if e.resolver.err != nil { + return nil, e.resolver.err + } + + if star, isStar := expr.(tree.UnqualifiedStar); isStar { + // Can't type check star -- we'll handle it later during eval. + return star, nil + } + + // Run type check & normalize. + typedExpr, err = expr.TypeCheck(ctx, &e.semaCtx, targetType) + if err != nil { + return nil, err + } + return normalize.Expr(e.evalCtx, typedExpr) +} + +// evalExpr evaluates typed expression and returns resulting datum. +// must be called after setupContext has been called. +func (e *exprEval) evalExpr( + ctx context.Context, expr tree.TypedExpr, targetType *types.T, +) (tree.Datum, error) { + switch t := expr.(type) { + case tree.Datum: + return t, nil + case *tree.IndexedVar: + d, err := e.rowEvalCtx.updatedRow.DatumAt(t.Idx) + if err != nil { + return nil, err + } + return d, nil + default: + v := replaceIndexVarVisitor{row: e.rowEvalCtx.updatedRow} + newExpr, _ := tree.WalkExpr(&v, expr) + if v.err != nil { + return nil, v.err + } + + typedExpr, err := tree.TypeCheck(ctx, newExpr, &e.semaCtx, targetType) + if err != nil { + return nil, err + } + d, err := eval.Expr(e.evalCtx, typedExpr) + if err != nil { + return nil, err + } + return d, nil + } +} + +// cdcExprVisitor is a visitor responsible for analyzing expression to determine +// if it consists of expressions supported by CDC. +// This visitor is used early to sanity check expression. +type cdcExprVisitor struct { + err error +} + +var _ tree.Visitor = (*cdcExprVisitor)(nil) + +// validateExpressionForCDC runs quick checks to make sure that expr is valid for +// CDC use case. This doesn't catch all the invalid cases, but is a place to pick up +// obviously wrong expressions. +func validateExpressionForCDC(expr tree.Expr) (tree.Expr, error) { + var v cdcExprVisitor + expr, _ = tree.WalkExpr(&v, expr) + if v.err != nil { + return nil, v.err + } + return expr, nil +} + +// VisitPre implements tree.Visitor interface. +func (v *cdcExprVisitor) VisitPre(expr tree.Expr) (bool, tree.Expr) { + return v.err == nil, expr +} + +// VisitPost implements tree.Visitor interface. +func (v *cdcExprVisitor) VisitPost(expr tree.Expr) tree.Expr { + switch t := expr.(type) { + case *tree.FuncExpr: + fn, err := checkFunctionSupported(t) + if err != nil { + v.err = err + return expr + } + return fn + case *tree.Subquery: + v.err = pgerror.New(pgcode.FeatureNotSupported, "subquery expressions not supported by CDC") + return expr + default: + return expr + } +} + +// cdcNameResolver is a visitor that resolves names in the expression +// and associates them with the EventDescriptor columns. +type cdcNameResolver struct { + schemaexpr.NameResolutionVisitor + *cdcevent.EventDescriptor + err error +} + +// tag errors generated by cdcNameResolver. +type cdcResolverError struct { + error +} + +func (v *cdcNameResolver) wrapError() func() { + // NameResolutionVisitor returns "column X does not exist" error if expression references + // column that was not configured. This is a bit confusing for CDC since a column + // may exist in the table, but not be available for a particular family. So, annotate + // the error to make it more obvious. + // We only want to do this for errors returned by NameResolutionVisitor, and not errors + // that we generate ourselves. + if v.err == nil { + return func() { + if v.NameResolutionVisitor.Err() != nil && v.err == nil { + v.err = errors.WithHintf(v.Err(), + "object does not exist in table %q, family %q", v.TableName, v.FamilyName) + } + } + } + return func() {} +} + +// VisitPre implements tree.Visitor interface. +func (v *cdcNameResolver) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) { + defer v.wrapError()() + recurse, newExpr = v.NameResolutionVisitor.VisitPre(expr) + return v.err == nil, newExpr +} + +// VisitPost implements tree.Visitor interface. +func (v *cdcNameResolver) VisitPost(expr tree.Expr) tree.Expr { + defer v.wrapError()() + expr = v.NameResolutionVisitor.VisitPost(expr) + + switch t := expr.(type) { + case *tree.AllColumnsSelector: + // AllColumnsSelector occurs when "x.*" is used. We have a simple 1 table support, + // so make sure table names match. + if t.TableName.String() != v.TableName { + v.err = &cdcResolverError{ + error: pgerror.Newf(pgcode.UndefinedTable, "no data source matches pattern: %s", t.String()), + } + return t + } + // Now that we know table names match, turn this into unqualified star. + return tree.UnqualifiedStar{} + default: + return expr + } +} + +func resolveCustomCDCFunction(name string, fnCall *tree.FuncExpr) *tree.FuncExpr { + fn, exists := cdcFunctions[name] + if !exists { + return nil + } + return &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{FunctionReference: fn}, + Type: fnCall.Type, + Exprs: fnCall.Exprs, + } +} + +func checkFunctionSupported(fnCall *tree.FuncExpr) (*tree.FuncExpr, error) { + var fnName string + var fnClass tree.FunctionClass + var fnVolatility volatility.V + + unsupportedFunctionErr := func() error { + if fnName == "" { + fnName = fnCall.Func.String() + } + return &cdcResolverError{ + error: pgerror.Newf(pgcode.UndefinedFunction, "function %q unsupported by CDC", fnName), + } + } + + switch fn := fnCall.Func.FunctionReference.(type) { + case *tree.UnresolvedName: + // We may not have function definition yet if function takes arguments, + // or it's one of the custom cdc functions. + fnName = fn.String() + props, overloads := builtins.GetBuiltinProperties(fn.String()) + if props == nil { + if custom := resolveCustomCDCFunction(fnName, fnCall); custom != nil { + return custom, nil + } + return nil, unsupportedFunctionErr() + } + fnClass = props.Class + // Pick highest volatility overload. + for _, o := range overloads { + if o.Volatility > fnVolatility { + fnVolatility = o.Volatility + } + } + case *tree.FunctionDefinition: + fnName, fnClass = fn.Name, fn.Class + if fnCall.ResolvedOverload() != nil { + if _, isCDC := cdcFunctions[fnName]; isCDC { + return fnCall, nil + } + fnVolatility = fnCall.ResolvedOverload().Volatility + } else { + // Pick highest volatility overload. + for _, o := range fn.Definition { + overload := o.(*tree.Overload) + if overload.Volatility > fnVolatility { + fnVolatility = overload.Volatility + } + } + } + default: + return nil, errors.AssertionFailedf("unexpected function expression of type %T", fn) + } + + // Aggregates, generators and window functions are not supported. + switch fnClass { + case tree.AggregateClass, tree.GeneratorClass, tree.WindowClass: + return nil, unsupportedFunctionErr() + } + + if fnVolatility <= volatility.Immutable { + // Remaining immutable functions are safe. + return fnCall, nil + } + + // We have a non-immutable function -- make sure it is supported. + _, isSafe := supportedVolatileBuiltinFunctions[fnName] + if !isSafe { + return nil, unsupportedFunctionErr() + } + return fnCall, nil +} + +// rowContainer is a structure to assist with evaluation of CDC expressions. +type rowContainer struct { + cols []cdcevent.ResultColumn +} + +var _ tree.IndexedVarContainer = (*rowContainer)(nil) + +// IndexedVarResolvedType implements tree.IndexedVarContainer +func (c *rowContainer) IndexedVarResolvedType(idx int) *types.T { + return c.cols[idx].Typ +} + +// IndexedVarNodeFormatter implements tree.IndexedVarContainer +func (c *rowContainer) IndexedVarNodeFormatter(idx int) tree.NodeFormatter { + return nil +} + +type replaceIndexVarVisitor struct { + row cdcevent.Row + err error +} + +var _ tree.Visitor = (*replaceIndexVarVisitor)(nil) + +// VisitPre implements tree.Visitor interface. +func (v *replaceIndexVarVisitor) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) { + if iVar, ok := expr.(*tree.IndexedVar); ok { + datum, err := v.row.DatumAt(iVar.Idx) + if err != nil { + v.err = pgerror.Wrapf(err, pgcode.NumericValueOutOfRange, "variable @%d out of bounds", iVar.Idx) + return false, expr + } + return true, datum + } + return true, expr +} + +// VisitPost implements tree.Visitor interface. +func (v *replaceIndexVarVisitor) VisitPost(expr tree.Expr) (newNode tree.Expr) { + return expr +} + +// cdcAnnotationAddr is the address used to store relevant information +// in the Annotation field of evalCtx when evaluating expressions. +const cdcAnnotationAddr tree.AnnotationIdx = iota + 1 + +func rowEvalContextFromEvalContext(evalCtx *eval.Context) *rowEvalContext { + return evalCtx.Annotations.Get(cdcAnnotationAddr).(*rowEvalContext) +} diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go new file mode 100644 index 000000000000..605783c8c1df --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -0,0 +1,750 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + "sort" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/distsql" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +func TestNoopPredicate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, + "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, FAMILY most (a,b,c), FAMILY only_d (d))") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + + serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() + decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, + jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, + TableID: desc.GetID(), + FamilyName: "most", + }, + }, + }) + + popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc) + defer cleanup() + sqlDB.Exec(t, "INSERT INTO foo (a, b, d) VALUES (1, 'one', -1)") + testRow := decodeRow(t, decoder, popRow(t), false) + + e, err := makeEvaluator(t, s.ClusterSettings(), "") + require.NoError(t, err) + + matches, err := e.MatchesFilter(ctx, testRow, hlc.Timestamp{}, testRow) + require.NoError(t, err) + require.True(t, matches) + + projection, err := e.Projection(ctx, testRow, hlc.Timestamp{}, testRow) + require.NoError(t, err) + require.Equal(t, testRow.EventDescriptor, projection.EventDescriptor) +} + +// readSortedRangeFeedValues reads n values, and sorts them based on key order. +func readSortedRangeFeedValues( + t *testing.T, n int, row func(t *testing.T) *roachpb.RangeFeedValue, +) (res []roachpb.RangeFeedValue) { + t.Helper() + for i := 0; i < n; i++ { + v := row(t) + res = append(res, *v) + } + sort.Slice(res, func(i, j int) bool { + return res[i].Key.Compare(res[j].Key) < 0 + }) + return res +} + +func TestEvaluator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + d STRING AS (concat(b, c)) VIRTUAL, + e status DEFAULT 'inactive', + PRIMARY KEY (b, a), + FAMILY main (a, b, e), + FAMILY only_c (c) +)`) + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + + type decodeExpectation struct { + expectUnwatchedErr bool + projectionErr string + + // current value expectations. + expectFiltered bool + keyValues []string + allValues map[string]string + } + + repeatExpectation := func(e decodeExpectation, n int) (repeated []decodeExpectation) { + for i := 0; i < n; i++ { + repeated = append(repeated, e) + } + return + } + + // popExpectation removes the first expectation from the provided expectation list and returns it. + popExpectation := func(t *testing.T, expectations []decodeExpectation) (decodeExpectation, []decodeExpectation) { + t.Helper() + require.Less(t, 0, len(expectations)) + return expectations[0], expectations[1:] + } + + for _, tc := range []struct { + testName string + familyName string // Must be set if targetType ChangefeedTargetSpecification_COLUMN_FAMILY + setupActions []string // SQL statements to execute before starting rangefeed. + actions []string // SQL statements to execute after starting rangefeed. + predicate string + predicateErr string // Expect to get an error when configuring predicates + + expectMainFamily []decodeExpectation + expectOnlyCFamily []decodeExpectation + }{ + { + testName: "main/star", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, '1st test')"}, + predicate: "SELECT * FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"1st test", "1"}, + allValues: map[string]string{"a": "1", "b": "1st test", "e": "inactive"}, + }, + }, + }, + { + testName: "main/qualified_star", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'qualified')"}, + predicate: "SELECT foo.* FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"qualified", "1"}, + allValues: map[string]string{"a": "1", "b": "qualified", "e": "inactive"}, + }, + }, + }, + { + testName: "main/star_delete", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b) VALUES (2, '2nd test')", + "DELETE FROM foo WHERE a=2 AND b='2nd test'", + }, + predicate: "SELECT *, cdc_is_delete() FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"2nd test", "2"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"}, + }, + { + keyValues: []string{"2nd test", "2"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"}, + }, + }, + }, + { + testName: "main/projection", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (3, '3rd test')"}, + predicate: "SELECT e, a FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"3rd test", "3"}, + allValues: map[string]string{"a": "3", "e": "inactive"}, + }, + }, + }, + { + testName: "main/not_closed", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b, e) VALUES (1, '4th test', 'closed')", + "INSERT INTO foo (a, b, e) VALUES (2, '4th test', 'open')", + "INSERT INTO foo (a, b, e) VALUES (3, '4th test', 'closed')", + "INSERT INTO foo (a, b, e) VALUES (4, '4th test', 'closed')", + "INSERT INTO foo (a, b, e) VALUES (5, '4th test', 'inactive')", + }, + predicate: "SELECT a FROM _ WHERE e IN ('open', 'inactive')", + expectMainFamily: []decodeExpectation{ + { + expectFiltered: true, + keyValues: []string{"4th test", "1"}, + }, + { + keyValues: []string{"4th test", "2"}, + allValues: map[string]string{"a": "2"}, + }, + { + expectFiltered: true, + keyValues: []string{"4th test", "3"}, + }, + { + expectFiltered: true, + keyValues: []string{"4th test", "4"}, + }, + { + keyValues: []string{"4th test", "5"}, + allValues: map[string]string{"a": "5"}, + }, + }, + }, + { + testName: "main/same_column_many_times", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, '5th test')"}, + predicate: "SELECT *, a, a as one_more, a FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"5th test", "1"}, + allValues: map[string]string{ + "a": "1", "b": "5th test", "e": "inactive", + "a_1": "1", "one_more": "1", "a_2": "1", + }, + }, + }, + }, + { + testName: "main/no_col_c", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'no_c')"}, + predicate: "SELECT *, c FROM _", + expectMainFamily: []decodeExpectation{ + { + projectionErr: `column "c" does not exist`, + keyValues: []string{"no_c", "1"}, + }, + }, + }, + { + testName: "main/non_primary_family_with_var_free", + familyName: "only_c", + actions: []string{"INSERT INTO foo (a, b, c) VALUES (42, '6th test', 'c value')"}, + predicate: "SELECT sin(pi()/2) AS var_free, c, b ", + expectMainFamily: []decodeExpectation{ + { + expectUnwatchedErr: true, + }, + }, + expectOnlyCFamily: []decodeExpectation{ + { + keyValues: []string{"6th test", "42"}, + allValues: map[string]string{"b": "6th test", "c": "c value", "var_free": "1.0"}, + }, + }, + }, + { + testName: "main/cdc_prev_select", + familyName: "only_c", + actions: []string{ + "INSERT INTO foo (a, b, c) VALUES (42, 'prev_select', 'c value old')", + "UPSERT INTO foo (a, b, c) VALUES (42, 'prev_select', 'c value updated')", + }, + predicate: "SELECT a, b, c, (CASE WHEN cdc_prev()->>'c' IS NULL THEN 'not there' ELSE cdc_prev()->>'c' END) AS old_c", + expectMainFamily: []decodeExpectation{ + { + expectUnwatchedErr: true, + }, + }, + expectOnlyCFamily: []decodeExpectation{ + { + keyValues: []string{"prev_select", "42"}, + allValues: map[string]string{"a": "42", "b": "prev_select", "c": "c value old", "old_c": "not there"}, + }, + { + keyValues: []string{"prev_select", "42"}, + allValues: map[string]string{"a": "42", "b": "prev_select", "c": "c value updated", "old_c": "c value old"}, + }, + }, + }, + { + testName: "main/select_if", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b) VALUES (123, 'select_if')", + "DELETE FROM foo where a=123", + }, + predicate: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"select_if", "123"}, + allValues: map[string]string{"conditional": "123"}, + }, + { + keyValues: []string{"select_if", "123"}, + allValues: map[string]string{"conditional": "deleted"}, + }, + }, + }, + { + testName: "main/btrim", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b) VALUES (1, ' spaced out ')", + }, + predicate: "SELECT btrim(b), parse_timetz('1:00-0') AS past FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{" spaced out ", "1"}, + allValues: map[string]string{"btrim": "spaced out", "past": "01:00:00+00:00:00"}, + }, + }, + }, + { + testName: "main/btrim_wrong_type", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b) VALUES (1, ' spaced out ')", + }, + predicate: "SELECT btrim(a) FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{" spaced out ", "1"}, + projectionErr: "unknown signature: btrim\\(int\\)", + }, + }, + }, + { + testName: "main/contradiction", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'contradiction')"}, + predicate: "SELECT * FROM _ WHERE 1 > 2", + expectMainFamily: []decodeExpectation{ + { + projectionErr: `filter .* is a contradiction`, + keyValues: []string{"contradiction", "1"}, + }, + }, + }, + { + testName: "main/no_sleep", + familyName: "main", + predicate: "SELECT *, pg_sleep(86400) AS wake_up FROM _", + predicateErr: `function "pg_sleep" unsupported by CDC`, + }, + { + testName: "main/no_subselect", + familyName: "main", + predicate: "SELECT cdc_prev(), cdc_is_delete(123), (select column1 from (values (1,2,3))) FROM _", + predicateErr: `subquery expressions not supported by CDC`, + }, + { + testName: "main/no_subselect_in_where", + familyName: "main", + predicate: "SELECT cdc_prev() FROM _ WHERE a = 2 AND (select 3) = 3", + predicateErr: `subquery expressions not supported by CDC`, + }, + { + testName: "main/filter_many", + familyName: "only_c", + actions: []string{ + "INSERT INTO foo (a, b, c) WITH s AS " + + "(SELECT generate_series as x FROM generate_series(1, 100)) " + + "SELECT x, 'filter_many', x::string FROM s", + }, + predicate: "SELECT * FROM _ WHERE a % 33 = 0", + expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100), + expectOnlyCFamily: func() (expectations []decodeExpectation) { + for i := 1; i <= 100; i++ { + iStr := strconv.FormatInt(int64(i), 10) + e := decodeExpectation{ + keyValues: []string{"filter_many", iStr}, + } + if i%33 == 0 { + e.allValues = map[string]string{"c": iStr} + } else { + e.expectFiltered = true + } + expectations = append(expectations, e) + } + return expectations + }(), + }, + { + testName: "main/only_some_deleted_values", + familyName: "only_c", + setupActions: []string{ + "INSERT INTO foo (a, b, c) WITH s AS " + + "(SELECT generate_series as x FROM generate_series(1, 100)) " + + "SELECT x, 'only_some_deleted_values', x::string FROM s", + }, + actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"}, + predicate: `SELECT * FROM _ WHERE cdc_is_delete() AND cast(cdc_prev()->>'a' as int) % 33 = 0`, + expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100), + expectOnlyCFamily: func() (expectations []decodeExpectation) { + for i := 1; i <= 100; i++ { + e := decodeExpectation{ + keyValues: []string{"only_some_deleted_values", strconv.FormatInt(int64(i), 10)}, + expectFiltered: i%33 != 0, + allValues: map[string]string{"c": "NULL"}, + } + expectations = append(expectations, e) + } + return expectations + }(), + }, + } { + t.Run(tc.testName, func(t *testing.T) { + sqlDB.Exec(t, "DELETE FROM foo WHERE true") + + // Setup evaluator. + evaluator, err := makeEvaluator(t, s.ClusterSettings(), tc.predicate) + if tc.predicateErr != "" { + require.Regexp(t, tc.predicateErr, err) + return + } + require.NoError(t, err) + + targetType := jobspb.ChangefeedTargetSpecification_EACH_FAMILY + if tc.familyName != "" { + targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY + } + + details := jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: targetType, + TableID: desc.GetID(), + FamilyName: tc.familyName, + }, + }, + } + serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() + decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, details) + + for _, action := range tc.setupActions { + sqlDB.Exec(t, action) + } + + popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc) + defer cleanup() + + for _, action := range tc.actions { + sqlDB.Exec(t, action) + } + + expectedEvents := len(tc.expectMainFamily) + len(tc.expectOnlyCFamily) + vals := readSortedRangeFeedValues(t, expectedEvents, popRow) + for _, v := range vals { + eventFamilyID, err := cdcevent.TestingGetFamilyIDFromKey(decoder, v.Key, v.Timestamp()) + require.NoError(t, err) + + var expect decodeExpectation + if eventFamilyID == 0 { + expect, tc.expectMainFamily = popExpectation(t, tc.expectMainFamily) + } else { + expect, tc.expectOnlyCFamily = popExpectation(t, tc.expectOnlyCFamily) + } + + updatedRow, err := decodeRowErr(decoder, &v, false) + if expect.expectUnwatchedErr { + require.ErrorIs(t, err, cdcevent.ErrUnwatchedFamily) + continue + } + + require.NoError(t, err) + require.True(t, updatedRow.IsInitialized()) + prevRow := decodeRow(t, decoder, &v, true) + require.NoError(t, err) + + if expect.expectFiltered { + require.Equal(t, expect.keyValues, slurpKeys(t, updatedRow), "isDelete=%t fid=%d", updatedRow.IsDeleted(), eventFamilyID) + matches, err := evaluator.MatchesFilter(ctx, updatedRow, v.Timestamp(), prevRow) + require.NoError(t, err) + require.False(t, matches, "keys: %v", slurpKeys(t, updatedRow)) + continue + } + + projection, err := evaluator.Projection(ctx, updatedRow, v.Timestamp(), prevRow) + if expect.projectionErr != "" { + require.Regexp(t, expect.projectionErr, err) + // Sanity check we get error for the row we expected to get an error for. + require.Equal(t, expect.keyValues, slurpKeys(t, updatedRow)) + } else { + require.NoError(t, err) + require.Equal(t, expect.keyValues, slurpKeys(t, projection)) + require.Equal(t, expect.allValues, slurpValues(t, projection)) + } + } + }) + } +} + +// Tests that use of volatile functions, without CDC specific override, +// results in an error. +func TestUnsupportedCDCFunctions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") + + for fnCall, errFn := range map[string]string{ + // Some volatile functions. + "version()": "version", + "crdb_internal.trace_id()": "crdb_internal.trace_id", + "crdb_internal.locality_value('blah')": "crdb_internal.locality_value", + "1 + crdb_internal.trace_id()": "crdb_internal.trace_id", + "current_user()": "current_user", + "nextval('seq')": "nextval", + + // Special form of CURRENT_USER() is SESSION_USER (no parens). + "SESSION_USER": "session_user", + + // Aggregator functions + "generate_series(1, 10)": "generate_series", + + // Unsupported functions that take arguments from foo. + "generate_series(1, a)": "generate_series", + "crdb_internal.read_file(b)": "crdb_internal.read_file", + "crdb_internal.get_namespace_id()": "crdb_internal.get_namespace_id", + } { + t.Run(fmt.Sprintf("select/%s", errFn), func(t *testing.T) { + _, err := makeEvaluator(t, s.ClusterSettings(), fmt.Sprintf("SELECT %s", fnCall)) + require.Regexp(t, fmt.Sprintf(`function "%s" unsupported by CDC`, errFn), err) + }) + + // Same thing, but with the WHERE clause + t.Run(fmt.Sprintf("where/%s", errFn), func(t *testing.T) { + _, err := makeEvaluator(t, s.ClusterSettings(), + fmt.Sprintf("SELECT 1 WHERE %s IS NOT NULL", fnCall)) + require.Regexp(t, fmt.Sprintf(`function "%s" unsupported by CDC`, errFn), err) + }) + } +} + +func TestEvaluatesProjection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, ""+ + "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, "+ + "FAMILY most (a,b,c), FAMILY only_d (d))") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) + + verifyConstantsFolded := func(p *exprEval) { + for _, expr := range p.selectors { + _ = expr.(tree.Datum) + } + } + + for _, tc := range []struct { + name string + predicate string + input rowenc.EncDatumRow + expectErr string + expectation map[string]string + verifyFold bool + }{ + { + name: "constants", + predicate: "SELECT 1, 2, 3", + expectation: map[string]string{"column_1": "1", "column_2": "2", "column_3": "3"}, + verifyFold: true, + }, + { + name: "constants_functions_and_aliases", + predicate: "SELECT 0 as zero, abs(-2) two, 42", + expectation: map[string]string{"zero": "0", "two": "2", "column_3": "42"}, + verifyFold: true, + }, + { + name: "trig_fun", + predicate: "SELECT cos(0), sin(pi()/2) as sin_90, 39 + pi()::int", + expectation: map[string]string{"cos": "1.0", "sin_90": "1.0", "column_3": "42"}, + verifyFold: true, + }, + { + name: "div_by_zero", + predicate: "SELECT 3 / sin(pi() - pi()) as result", + expectErr: "division by zero", + }, + { + name: "projection_with_bound_vars", + predicate: "SELECT sqrt(a::float) + sin(pi()/2) as result, foo.*", + input: makeEncDatumRow(tree.NewDInt(4), tree.DNull, tree.DNull), + expectation: map[string]string{"result": "3.0", "a": "4", "b": "NULL", "c": "NULL"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, tc.predicate) + if tc.expectErr != "" { + require.Regexp(t, tc.expectErr, err) + return + } + + require.NoError(t, err) + if tc.verifyFold { + verifyConstantsFolded(e) + } + row := testRow + if tc.input != nil { + row = cdcevent.TestingMakeEventRow(desc, 0, tc.input, false) + } + + p, err := e.evalProjection(context.Background(), row, hlc.Timestamp{}, row) + require.NoError(t, err) + require.Equal(t, tc.expectation, slurpValues(t, p)) + }) + } +} + +// makeEvaluator creates Evaluator and configures it with specified +// select statement predicate. +func makeEvaluator(t *testing.T, st *cluster.Settings, selectStr string) (Evaluator, error) { + t.Helper() + evalCtx := eval.MakeTestingEvalContext(st) + e := NewEvaluator(&evalCtx) + if selectStr == "" { + return e, nil + } + s, err := parser.ParseOne(selectStr) + require.NoError(t, err) + slct := s.AST.(*tree.Select).Select.(*tree.SelectClause) + if err := e.ConfigureProjection(slct.Exprs); err != nil { + return Evaluator{}, err + } + + if slct.Where != nil { + if err := e.ConfigureFilter(slct.Where.Expr); err != nil { + return Evaluator{}, err + } + } + return e, nil +} + +func makeExprEval( + t *testing.T, st *cluster.Settings, ed *cdcevent.EventDescriptor, selectStr string, +) (*exprEval, error) { + t.Helper() + e, err := makeEvaluator(t, st, selectStr) + require.NoError(t, err) + + if err := e.initEval(context.Background(), ed); err != nil { + return nil, err + } + return e.evaluator, nil +} + +func decodeRowErr( + decoder cdcevent.Decoder, v *roachpb.RangeFeedValue, prev bool, +) (cdcevent.Row, error) { + kv := roachpb.KeyValue{Key: v.Key} + if prev { + kv.Value = v.PrevValue + } else { + kv.Value = v.Value + } + return decoder.DecodeKV(context.Background(), kv, v.Timestamp()) +} + +func decodeRow( + t *testing.T, decoder cdcevent.Decoder, v *roachpb.RangeFeedValue, prev bool, +) cdcevent.Row { + r, err := decodeRowErr(decoder, v, prev) + require.NoError(t, err) + return r +} + +func slurpKeys(t *testing.T, r cdcevent.Row) (keys []string) { + t.Helper() + require.NoError(t, r.ForEachKeyColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + keys = append(keys, tree.AsStringWithFlags(d, tree.FmtExport)) + return nil + })) + return keys +} + +func slurpValues(t *testing.T, r cdcevent.Row) map[string]string { + t.Helper() + res := make(map[string]string) + require.NoError(t, r.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + res[col.Name] = tree.AsStringWithFlags(d, tree.FmtExport) + return nil + })) + return res +} + +func randEncDatumRow( + t *testing.T, desc catalog.TableDescriptor, familyID descpb.FamilyID, +) (row rowenc.EncDatumRow) { + t.Helper() + rng, _ := randutil.NewTestRand() + + family, err := desc.FindFamilyByID(familyID) + require.NoError(t, err) + for _, colID := range family.ColumnIDs { + col, err := desc.FindColumnWithID(colID) + require.NoError(t, err) + row = append(row, rowenc.EncDatum{Datum: randgen.RandDatum(rng, col.GetType(), col.IsNullable())}) + } + return row +} + +func makeEncDatumRow(datums ...tree.Datum) (row rowenc.EncDatumRow) { + for _, d := range datums { + row = append(row, rowenc.EncDatum{Datum: d}) + } + return row +} diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go new file mode 100644 index 000000000000..f781f5fd24ca --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -0,0 +1,182 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + jsonb "github.com/cockroachdb/cockroach/pkg/util/json" +) + +// In general, we want to only support functions that produce the same +// value given the same data -- i.e. immutable functions. +// However, we can provide reasonable overrides to a small set of stable +// functions that make sense in the context of CDC. +var supportedVolatileBuiltinFunctions = makeStringSet( + // These functions can be supported given that we set the statement + // and transaction timestamp to be equal to MVCC timestamp of the event. + // TODO(yevgeniy): We also define cdc specific functions, s.a. cdc_mvcc_timestamp + // Maybe delete cdc_ overrides; or.... maybe disallow these builtins in favor of cdc_ specific overrides? + "current_date", + "current_timestamp", + "localtimestamp", + "now", + "statement_timestamp", + "transaction_timestamp", +) + +// CDC Specific functions. +// TODO(yevgeniy): Finalize function naming: e.g. cdc.is_delete() vs cdc_is_delete() +var cdcFunctions = map[string]*tree.FunctionDefinition{ + "cdc_is_delete": makeCDCBuiltIn( + "cdc_is_delete", + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Bool), + Fn: func(evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { + rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) + if rowEvalCtx.updatedRow.IsDeleted() { + return tree.DBoolTrue, nil + } + return tree.DBoolFalse, nil + }, + Info: "Returns true if the event is a deletion", + Volatility: volatility.Stable, + }), + "cdc_mvcc_timestamp": cdcTimestampBuiltin( + "cdc_mvcc_timestamp", + "Returns event MVCC HLC timestamp", + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.mvccTS + }, + ), + "cdc_updated_timestamp": cdcTimestampBuiltin( + "cdc_updated_timestamp", + "Returns event updated HLC timestamp", + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.updatedRow.SchemaTS + }, + ), + "cdc_prev": makeCDCBuiltIn( + "cdc_prev", + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Jsonb), + Fn: prevRowAsJSON, + Info: "Returns previous value of a row as JSONB", + Volatility: volatility.Stable, + }), +} + +// TODO(yevgeniy): Implement additional functions (some ideas, not all should be implemented): +// * cdc_is_delete is easy; what about update? does update include new events? +// * cdc_is_new -- true if event is a new row +// * tuple overload (or cdc_prev_tuple) to return previous value as a tuple +// * cdc_key -- effectively key_in_value where key columns returned as either a tuple or a json. +// * cdc_key_cols -- return key column names; +// * this can come in handy when working with jsonb; for example, emit previous JSONB excluding +// key columns can be done with `SELECT cdc_prev() - cdc_key_cols() +// * cdc_event_family_is(fam): return true if cdc event family is specified family; overload both for +// family ID and family name. +// function can be used to write complex conditionals when dealing with multi-family table(s) + +var cdcFnProps = &tree.FunctionProperties{ + Category: "CDC builtin", +} + +func makeCDCBuiltIn(fnName string, overloads ...tree.Overload) *tree.FunctionDefinition { + return tree.NewFunctionDefinition(fnName, cdcFnProps, overloads) +} + +func cdcTimestampBuiltin( + fnName string, doc string, tsFn func(rowEvalCtx *rowEvalContext) hlc.Timestamp, +) *tree.FunctionDefinition { + return tree.NewFunctionDefinition( + fnName, + cdcFnProps, + []tree.Overload{ + { + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Decimal), + Fn: func(evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { + rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) + return eval.TimestampToDecimalDatum(tsFn(rowEvalCtx)), nil + }, + Info: doc, + Volatility: volatility.Stable, + }, + }, + ) +} + +func prevRowAsJSON(evalCtx *eval.Context, _ tree.Datums) (tree.Datum, error) { + rec := rowEvalContextFromEvalContext(evalCtx) + if rec.memo.prevJSON != nil { + return rec.memo.prevJSON, nil + } + + var prevJSON *tree.DJSON + if rec.prevRow.IsInitialized() { + b := jsonb.NewObjectBuilder(0) + if err := rec.prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + b.Add(col.Name, j) + return nil + }); err != nil { + return nil, err + } + prevJSON = tree.NewDJSON(b.Build()) + } else { + prevJSON = tree.NewDJSON(jsonb.NullJSONValue) + } + + rec.memo.prevJSON = prevJSON + return prevJSON, nil +} + +type cdcCustomFunctionResolver struct { + sessiondata.SearchPath +} + +// Resolve implements tree.CustomFunctionDefinitionResolver +func (cdcCustomFunctionResolver) Resolve(name string) *tree.FunctionDefinition { + fn, found := cdcFunctions[name] + if found { + return fn + } + fn, found = cdcFunctions[strings.ToLower(name)] + if found { + return fn + } + fn, found = tree.FunDefs[name] + if found { + return fn + } + return nil +} + +func makeStringSet(vals ...string) map[string]struct{} { + m := make(map[string]struct{}, len(vals)) + for _, v := range vals { + m[v] = struct{}{} + } + return m +} diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go new file mode 100644 index 000000000000..2eb31d7c74cd --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -0,0 +1,127 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + jsonb "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestEvaluatesCDCFunctionOverloads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, ""+ + "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, "+ + "FAMILY most (a,b,c), FAMILY only_d (d))") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + + ctx := context.Background() + + t.Run("current_timestamp", func(t *testing.T) { + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + "SELECT current_timestamp::int") + require.NoError(t, err) + futureTS := s.Clock().Now().Add(int64(60*time.Minute), 0) + p, err := e.evalProjection(ctx, testRow, futureTS, testRow) + require.NoError(t, err) + require.Equal(t, + map[string]string{"current_timestamp": strconv.FormatInt(futureTS.GoTime().Unix(), 10)}, + slurpValues(t, p)) + }) + + t.Run("cdc_is_delete", func(t *testing.T) { + for _, expectDelete := range []bool{true, false} { + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), expectDelete) + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + "SELECT cdc_is_delete()") + require.NoError(t, err) + + p, err := e.evalProjection(ctx, testRow, s.Clock().Now(), testRow) + require.NoError(t, err) + require.Equal(t, + map[string]string{"cdc_is_delete": fmt.Sprintf("%t", expectDelete)}, + slurpValues(t, p)) + } + }) + + t.Run("cdc_prev", func(t *testing.T) { + rowDatums := randEncDatumRow(t, desc, 0) + testRow := cdcevent.TestingMakeEventRow(desc, 0, rowDatums, false) + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + "SELECT cdc_prev()") + require.NoError(t, err) + + // When previous row is not set -- i.e. if running without diff, cdc_prev returns + // null json. + p, err := e.evalProjection(ctx, testRow, s.Clock().Now(), cdcevent.Row{}) + require.NoError(t, err) + require.Equal(t, map[string]string{"cdc_prev": jsonb.NullJSONValue.String()}, slurpValues(t, p)) + + // Otherwise, expect to get JSONB. + b := jsonb.NewObjectBuilder(len(rowDatums)) + for i, d := range rowDatums { + j, err := tree.AsJSON(d.Datum, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + require.NoError(t, err) + } + b.Add(desc.PublicColumns()[i].GetName(), j) + } + + expectedJSON := b.Build() + p, err = e.evalProjection(ctx, testRow, s.Clock().Now(), testRow) + require.NoError(t, err) + require.Equal(t, map[string]string{"cdc_prev": expectedJSON.String()}, slurpValues(t, p)) + }) + + for _, cast := range []string{"", "::decimal", "::string"} { + t.Run(fmt.Sprintf("cdc_{mvcc,updated}_timestamp()%s", cast), func(t *testing.T) { + schemaTS := s.Clock().Now().Add(int64(60*time.Minute), 0) + mvccTS := schemaTS.Add(int64(30*time.Minute), 0) + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) + testRow.EventDescriptor.SchemaTS = schemaTS + + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + fmt.Sprintf( + "SELECT cdc_mvcc_timestamp()%[1]s as mvcc, cdc_updated_timestamp()%[1]s as updated", cast, + )) + require.NoError(t, err) + + p, err := e.evalProjection(ctx, testRow, mvccTS, testRow) + require.NoError(t, err) + require.Equal(t, + map[string]string{ + "mvcc": eval.TimestampToDecimalDatum(mvccTS).String(), + "updated": eval.TimestampToDecimalDatum(schemaTS).String(), + }, + slurpValues(t, p)) + }) + } +} diff --git a/pkg/ccl/changefeedccl/cdceval/main_test.go b/pkg/ccl/changefeedccl/cdceval/main_test.go new file mode 100644 index 000000000000..35321d8f87af --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 14c108663f47..3baddb22699e 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "doc.go", "event.go", + "projection.go", "rowfetcher_cache.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent", @@ -24,6 +25,7 @@ go_library( "//pkg/sql/row", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/hlc", @@ -38,27 +40,26 @@ go_test( srcs = [ "event_test.go", "main_test.go", + "projection_test.go", ], embed = [":cdcevent"], deps = [ "//pkg/base", + "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/utilccl", "//pkg/jobs/jobspb", - "//pkg/keys", - "//pkg/kv", - "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", - "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", - "//pkg/sql/catalog/desctestutils", "//pkg/sql/distsql", + "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index a8628bf54e72..1ca6bdae6d0d 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -38,6 +38,7 @@ type Metadata struct { FamilyID descpb.FamilyID // Column family ID. FamilyName string // Column family name. HasOtherFamilies bool // True if the table multiple families. + SchemaTS hlc.Timestamp // Schema timestamp for table descriptor. } // Decoder is an interface for decoding KVs into cdc event row. @@ -48,7 +49,7 @@ type Decoder interface { // Row holds a row corresponding to an event. type Row struct { - *eventDescriptor + *EventDescriptor // datums is the new value of a changed table row. datums rowenc.EncDatumRow @@ -84,7 +85,7 @@ func (r Row) ForEachKeyColumn() Iterator { // ForEachColumn returns Iterator for each column. func (r Row) ForEachColumn() Iterator { - return iter{r: r, cols: r.familyCols} + return iter{r: r, cols: r.valueCols} } // ForEachUDTColumn returns Datum iterator for each column containing user defined types. @@ -92,6 +93,19 @@ func (r Row) ForEachUDTColumn() Iterator { return iter{r: r, cols: r.udtCols} } +// DatumAt returns Datum at specified position. +func (r Row) DatumAt(at int) (tree.Datum, error) { + if at >= len(r.cols) { + return nil, errors.AssertionFailedf("column at %d out of bounds", at) + } + col := r.cols[at] + encDatum := r.datums[col.ord] + if err := encDatum.EnsureDecoded(col.Typ, r.alloc); err != nil { + return nil, errors.Wrapf(err, "error decoding column %q as type %s", col.Name, col.Typ.String()) + } + return encDatum.Datum, nil +} + // IsDeleted returns true if event corresponds to a deletion event. func (r Row) IsDeleted() bool { return r.deleted @@ -99,7 +113,7 @@ func (r Row) IsDeleted() bool { // IsInitialized returns true if event row is initialized. func (r Row) IsInitialized() bool { - return r.eventDescriptor != nil + return r.EventDescriptor != nil } // HasValues returns true if event row has values to decode. @@ -160,12 +174,9 @@ func (c ResultColumn) Ordinal() int { return c.ord } -// eventDescriptor implements Metadata interface, and associates -// table/family descriptor with the information needed to decode events. -type eventDescriptor struct { +// EventDescriptor is a cdc event descriptor: collection of information describing Row. +type EventDescriptor struct { Metadata - desc catalog.TableDescriptor - family *descpb.ColumnFamilyDescriptor // List of result columns produced by this descriptor. // This may be different from the table descriptors public columns @@ -173,15 +184,18 @@ type eventDescriptor struct { cols []ResultColumn // Precomputed index lists into cols. - keyCols []int // Primary key columns. - familyCols []int // All column family columns. - udtCols []int // Columns containing UDTs. + keyCols []int // Primary key columns. + valueCols []int // All column family columns. + udtCols []int // Columns containing UDTs. } func newEventDescriptor( - desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtualColumns bool, -) (*eventDescriptor, error) { - sd := eventDescriptor{ + desc catalog.TableDescriptor, + family *descpb.ColumnFamilyDescriptor, + includeVirtualColumns bool, + schemaTS hlc.Timestamp, +) (*EventDescriptor, error) { + sd := EventDescriptor{ Metadata: Metadata{ TableID: desc.GetID(), TableName: desc.GetName(), @@ -189,13 +203,12 @@ func newEventDescriptor( FamilyID: family.ID, FamilyName: family.Name, HasOtherFamilies: desc.NumFamilies() > 1, + SchemaTS: schemaTS, }, - family: family, - desc: desc, } // addColumn is a helper to add a column to this descriptor. - addColumn := func(col catalog.Column, ord int, colIdxSlice *[]int) { + addColumn := func(col catalog.Column, ord int) int { resultColumn := ResultColumn{ ResultColumn: colinfo.ResultColumn{ Name: col.GetName(), @@ -209,23 +222,26 @@ func newEventDescriptor( colIdx := len(sd.cols) sd.cols = append(sd.cols, resultColumn) - *colIdxSlice = append(*colIdxSlice, colIdx) if col.GetType().UserDefined() { sd.udtCols = append(sd.udtCols, colIdx) } + return colIdx } // Primary key columns must be added in the same order they // appear in the primary key index. primaryIdx := desc.GetPrimaryIndex() colOrd := catalog.ColumnIDToOrdinalMap(desc.PublicColumns()) + sd.keyCols = make([]int, primaryIdx.NumKeyColumns()) + var primaryKeyOrdinal catalog.TableColMap + for i := 0; i < primaryIdx.NumKeyColumns(); i++ { ord, ok := colOrd.Get(primaryIdx.GetKeyColumnID(i)) if !ok { return nil, errors.AssertionFailedf("expected to find column %d", ord) } - addColumn(desc.PublicColumns()[ord], ord, &sd.keyCols) + primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), i) } // Remaining columns go in same order as public columns. @@ -233,8 +249,17 @@ func newEventDescriptor( for ord, col := range desc.PublicColumns() { isInFamily := inFamily.Contains(col.GetID()) virtual := col.IsVirtual() && includeVirtualColumns - if isInFamily || virtual { - addColumn(col, ord, &sd.familyCols) + isValueCol := isInFamily || virtual + pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID()) + if isValueCol || isPKey { + colIdx := addColumn(col, ord) + if isValueCol { + sd.valueCols = append(sd.valueCols, colIdx) + } + + if isPKey { + sd.keyCols[pKeyOrd] = colIdx + } } } @@ -242,20 +267,31 @@ func newEventDescriptor( } // DebugString returns event descriptor debug information. -func (d *eventDescriptor) DebugString() string { - return fmt.Sprintf("eventDescriptor{table: %q(%d) family: %q(%d) pkCols=%v valCols=%v", - d.TableName, d.TableID, d.FamilyName, d.FamilyID, d.keyCols, d.familyCols) +func (d *EventDescriptor) DebugString() string { + return fmt.Sprintf("EventDescriptor{table: %q(%d) family: %q(%d) pkCols=%v valCols=%v", + d.TableName, d.TableID, d.FamilyName, d.FamilyID, d.keyCols, d.valueCols) } // SafeFormat implements SafeFormatter interface. -func (d *eventDescriptor) SafeFormat(p redact.SafePrinter, _ rune) { +func (d *EventDescriptor) SafeFormat(p redact.SafePrinter, _ rune) { p.Print(d.DebugString()) } +// ResultColumns returns all results columns in this descriptor. +func (d *EventDescriptor) ResultColumns() []ResultColumn { + return d.cols +} + +// Equals returns true if this descriptor equals other. +func (d *EventDescriptor) Equals(other *EventDescriptor) bool { + return other != nil && d.TableID == other.TableID && d.Version == other.Version && d.FamilyID == other.FamilyID +} + type eventDescriptorFactory func( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, -) (*eventDescriptor, error) + schemaTS hlc.Timestamp, +) (*EventDescriptor, error) type eventDecoder struct { // Cached allocations for *row.Fetcher @@ -281,12 +317,13 @@ func getEventDescriptorCached( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtual bool, + schemaTS hlc.Timestamp, cache *cache.UnorderedCache, -) (*eventDescriptor, error) { +) (*EventDescriptor, error) { idVer := idVersion{id: desc.GetID(), version: desc.GetVersion(), family: family.ID} if v, ok := cache.Get(idVer); ok { - ed := v.(*eventDescriptor) + ed := v.(*EventDescriptor) // Normally, this is a no-op since majority of changefeeds do not use UDTs. // However, in case we do, we must update cached UDT information based on this @@ -299,7 +336,7 @@ func getEventDescriptorCached( return ed, nil } - ed, err := newEventDescriptor(desc, family, includeVirtual) + ed, err := newEventDescriptor(desc, family, includeVirtual, schemaTS) if err != nil { return nil, err } @@ -317,7 +354,7 @@ func NewEventDecoder( cfg.LeaseManager.(*lease.Manager), cfg.CollectionFactory, cfg.DB, - details, + details.TargetSpecifications, ) includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull) @@ -325,8 +362,9 @@ func NewEventDecoder( getEventDescriptor := func( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, - ) (*eventDescriptor, error) { - return getEventDescriptorCached(desc, family, includeVirtual, eventDescriptorCache) + schemaTS hlc.Timestamp, + ) (*EventDescriptor, error) { + return getEventDescriptorCached(desc, family, includeVirtual, schemaTS, eventDescriptorCache) } return &eventDecoder{ @@ -354,13 +392,13 @@ func (d *eventDecoder) DecodeKV( return Row{}, err } - ed, err := d.getEventDescriptor(d.desc, d.family) + ed, err := d.getEventDescriptor(d.desc, d.family, schemaTS) if err != nil { return Row{}, err } return Row{ - eventDescriptor: ed, + EventDescriptor: ed, datums: datums, deleted: isDeleted, alloc: &d.alloc, @@ -441,22 +479,31 @@ func (it iter) Col(fn ColumnFn) error { // TestingMakeEventRow initializes Row with provided arguments. // Exposed for unit tests. func TestingMakeEventRow( - desc catalog.TableDescriptor, encRow rowenc.EncDatumRow, deleted bool, + desc catalog.TableDescriptor, familyID descpb.FamilyID, encRow rowenc.EncDatumRow, deleted bool, ) Row { - family, err := desc.FindFamilyByID(0) + family, err := desc.FindFamilyByID(familyID) if err != nil { panic(err) // primary column family always exists. } const includeVirtual = false - ed, err := newEventDescriptor(desc, family, includeVirtual) + ed, err := newEventDescriptor(desc, family, includeVirtual, hlc.Timestamp{}) if err != nil { panic(err) } var alloc tree.DatumAlloc return Row{ - eventDescriptor: ed, + EventDescriptor: ed, datums: encRow, deleted: deleted, alloc: &alloc, } } + +// TestingGetFamilyIDFromKey returns family ID encoded in the specified roachpb.Key. +// Exposed for testing. +func TestingGetFamilyIDFromKey( + decoder Decoder, key roachpb.Key, ts hlc.Timestamp, +) (descpb.FamilyID, error) { + _, familyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(context.Background(), key, ts) + return familyID, err +} diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 513d53f7bf24..75ed3f4e78a1 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -12,20 +12,15 @@ import ( "context" "fmt" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -56,7 +51,7 @@ CREATE TABLE foo ( FAMILY only_c (c) )`) - tableDesc := getHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") mainFamily := mustGetFamily(t, tableDesc, 0) cFamily := mustGetFamily(t, tableDesc, 1) @@ -95,7 +90,7 @@ CREATE TABLE foo ( }, } { t.Run(fmt.Sprintf("%s/includeVirtual=%t", tc.family.Name, tc.includeVirtual), func(t *testing.T) { - ed, err := newEventDescriptor(tableDesc, tc.family, tc.includeVirtual) + ed, err := newEventDescriptor(tableDesc, tc.family, tc.includeVirtual, s.Clock().Now()) require.NoError(t, err) // Verify Metadata information for event descriptor. @@ -105,7 +100,7 @@ CREATE TABLE foo ( require.True(t, ed.HasOtherFamilies) // Verify primary key and family columns are as expected. - r := Row{eventDescriptor: ed} + r := Row{EventDescriptor: ed} require.Equal(t, tc.expectedKeyCols, slurpColumns(t, r.ForEachKeyColumn())) require.Equal(t, tc.expectedColumns, slurpColumns(t, r.ForEachColumn())) require.Equal(t, tc.expectedUDTCols, slurpColumns(t, r.ForEachUDTColumn())) @@ -134,37 +129,9 @@ CREATE TABLE foo ( FAMILY only_c (c) )`) - tableDesc := getHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") - - // We'll use rangefeed to pluck out updated rows from the table. - rf := s.ExecutorConfig().(sql.ExecutorConfig).RangeFeedFactory - ctx := context.Background() - rows := make(chan *roachpb.RangeFeedValue) - _, err := rf.RangeFeed(ctx, "foo-feed", - []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}, - s.Clock().Now(), - func(ctx context.Context, value *roachpb.RangeFeedValue) { - select { - case <-ctx.Done(): - case rows <- value: - } - }, - rangefeed.WithDiff(true), - ) - require.NoError(t, err) - - // Helper to read next rangefeed value. - popRow := func(t *testing.T) *roachpb.RangeFeedValue { - t.Helper() - select { - case r := <-rows: - log.Infof(ctx, "Got Row: %s", roachpb.PrettyPrintKey(nil, r.Key)) - return r - case <-time.After(5 * time.Second): - t.Fatal("timeout reading row") - return nil - } - } + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) + defer cleanup() type decodeExpectation struct { expectUnwatchedErr bool @@ -337,12 +304,13 @@ CREATE TABLE foo ( } serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() decoder := NewEventDecoder(ctx, &serverCfg, details) expectedEvents := len(tc.expectMainFamily) + len(tc.expectOnlyCFamily) for i := 0; i < expectedEvents; i++ { v := popRow(t) - _, eventFamilyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(ctx, v.Key, v.Timestamp()) + eventFamilyID, err := TestingGetFamilyIDFromKey(decoder, v.Key, v.Timestamp()) require.NoError(t, err) var expect decodeExpectation @@ -435,22 +403,3 @@ func slurpDatums(t *testing.T, it Iterator) (res []string) { })) return res } - -func getHydratedTableDescriptor( - t *testing.T, execCfgI interface{}, kvDB *kv.DB, tableName string, -) catalog.TableDescriptor { - t.Helper() - desc := desctestutils.TestingGetPublicTableDescriptor( - kvDB, keys.SystemSQLCodec, "defaultdb", tableName) - if !desc.ContainsUserDefinedTypes() { - return desc - } - - ctx := context.Background() - execCfg := execCfgI.(sql.ExecutorConfig) - collection := execCfg.CollectionFactory.NewCollection(ctx, nil) - var err error - desc, err = refreshUDT(context.Background(), desc.GetID(), kvDB, collection, execCfg.Clock.Now()) - require.NoError(t, err) - return desc -} diff --git a/pkg/ccl/changefeedccl/cdcevent/projection.go b/pkg/ccl/changefeedccl/cdcevent/projection.go new file mode 100644 index 000000000000..cd57e4f8ccc0 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdcevent/projection.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdcevent + +import ( + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// Projection is a helper to facilitate construction of "projection" rows. +// Projection is constructed given the underlying event descriptor. Only the key +// columns from the descriptor are initialized upon construction. All other +// value columns returned by projection need to be configured separated via AddValueColumn, +// and the value for that column must later be set via SetValueDatumAt. +// All columns added to this projection are in the ordinal order. +type Projection Row + +// MakeProjection returns Projection builder given underlying descriptor. +func MakeProjection(d *EventDescriptor) Projection { + p := Projection{ + EventDescriptor: &EventDescriptor{Metadata: d.Metadata}, + } + + // Add all primary key columns. + for _, colIdx := range d.keyCols { + col := d.cols[colIdx] + p.addColumn(col.Name, col.Typ, col.sqlString, &p.keyCols) + } + return p +} + +func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colIdxSlice *[]int) { + ord := len(p.cols) + p.cols = append(p.cols, ResultColumn{ + ResultColumn: colinfo.ResultColumn{ + Name: name, + Typ: typ, + }, + ord: ord, + sqlString: sqlString, + }) + + p.datums = append(p.datums, rowenc.EncDatum{}) + *colIdxSlice = append(*colIdxSlice, ord) + if typ.UserDefined() { + p.udtCols = append(p.udtCols, ord) + } +} + +// AddValueColumn adds a value column to this projection builder. +func (p *Projection) AddValueColumn(name string, typ *types.T) { + p.addColumn(name, typ, "", &p.valueCols) +} + +// SetValueDatumAt sets value datum at specified position. +func (p *Projection) SetValueDatumAt(pos int, d tree.Datum) error { + pos += len(p.keyCols) + if pos >= len(p.datums) { + return errors.AssertionFailedf("%d out of bounds", pos) + } + + // A bit of a sanity check -- types must match or d must be DNULL. + col := p.cols[pos] + if !(d == tree.DNull || col.Typ.Equal(d.ResolvedType())) { + return errors.AssertionFailedf("expected type %s for column %s@%d, found %s", + col.Typ, col.Name, pos, d.ResolvedType()) + } + + p.datums[pos].Datum = d + return nil +} + +// Project returns row projection. +func (p *Projection) Project(r Row) (Row, error) { + p.deleted = r.IsDeleted() + + // Copy key datums. + idx := 0 + if err := r.ForEachKeyColumn().Datum(func(d tree.Datum, col ResultColumn) error { + if idx >= len(p.keyCols) || idx >= len(p.datums) { + return errors.AssertionFailedf("%d out of bounds when projecting key column %s", idx, col.Name) + } + + p.datums[idx].Datum = d + idx++ + return nil + }); err != nil { + return Row{}, err + } + + return Row(*p), nil +} diff --git a/pkg/ccl/changefeedccl/cdcevent/projection_test.go b/pkg/ccl/changefeedccl/cdcevent/projection_test.go new file mode 100644 index 000000000000..c54c11c2617d --- /dev/null +++ b/pkg/ccl/changefeedccl/cdcevent/projection_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdcevent + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestProjection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + PRIMARY KEY (b, a) +)`) + + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + encDatums := makeEncDatumRow(tree.NewDInt(1), tree.NewDString("one"), tree.DNull) + + t.Run("row_was_deleted", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, true) + p := MakeProjection(input.EventDescriptor) + pr, err := p.Project(input) + require.NoError(t, err) + require.Equal(t, []string{"one", "1"}, slurpDatums(t, pr.ForEachKeyColumn())) + require.Equal(t, []string(nil), slurpDatums(t, pr.ForEachColumn())) + }) + + t.Run("identity", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, false) + p := MakeProjection(input.EventDescriptor) + idx := 0 + require.NoError(t, input.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error { + p.AddValueColumn(col.Name, col.Typ) + err := p.SetValueDatumAt(idx, d) + idx++ + return err + })) + + pr, err := p.Project(input) + require.NoError(t, err) + require.Equal(t, slurpDatums(t, input.ForEachKeyColumn()), slurpDatums(t, pr.ForEachKeyColumn())) + require.Equal(t, slurpDatums(t, input.ForEachColumn()), slurpDatums(t, pr.ForEachColumn())) + }) + + t.Run("must_be_correct_type", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, false) + p := MakeProjection(input.EventDescriptor) + p.AddValueColumn("wrong_type", types.Int) + require.Regexp(t, "expected type int", p.SetValueDatumAt(0, tree.NewDString("fail"))) + // But we allow NULL. + require.NoError(t, p.SetValueDatumAt(0, tree.DNull)) + }) + + t.Run("project_extra_column", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, false) + p := MakeProjection(input.EventDescriptor) + idx := 0 + require.NoError(t, input.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error { + p.AddValueColumn(col.Name, col.Typ) + err := p.SetValueDatumAt(idx, d) + idx++ + return err + })) + p.AddValueColumn("test", types.Int) + require.NoError(t, p.SetValueDatumAt(idx, tree.NewDInt(5))) + + pr, err := p.Project(input) + require.NoError(t, err) + require.Equal(t, slurpDatums(t, input.ForEachKeyColumn()), slurpDatums(t, pr.ForEachKeyColumn())) + expectValues := slurpDatums(t, input.ForEachColumn()) + expectValues = append(expectValues, "5") + require.Equal(t, expectValues, slurpDatums(t, pr.ForEachColumn())) + }) +} + +func makeEncDatumRow(datums ...tree.Datum) (row rowenc.EncDatumRow) { + for _, d := range datums { + row = append(row, rowenc.EncDatum{Datum: d}) + } + return row +} diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index b4ad1e03a00e..8435ae987f44 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -79,9 +79,8 @@ func newRowFetcherCache( leaseMgr *lease.Manager, cf *descs.CollectionFactory, db *kv.DB, - details jobspb.ChangefeedDetails, + specs []jobspb.ChangefeedTargetSpecification, ) *rowFetcherCache { - specs := details.TargetSpecifications watchedFamilies := make(map[watchedFamily]struct{}, len(specs)) for _, s := range specs { watchedFamilies[watchedFamily{tableID: s.TableID, familyName: s.FamilyName}] = struct{}{} diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index 20d5839f9ca2..0a866a6aaee9 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "mock_webhook_sink.go", "nemeses.go", + "row.go", "schema_registry.go", "testfeed.go", "tls_util.go", @@ -16,8 +17,16 @@ go_library( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descs", + "//pkg/sql/sem/tree", "//pkg/testutils/serverutils", + "//pkg/util", "//pkg/util/fsm", "//pkg/util/hlc", "//pkg/util/log", @@ -26,6 +35,7 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_linkedin_goavro_v2//:goavro", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/cdctest/row.go b/pkg/ccl/changefeedccl/cdctest/row.go new file mode 100644 index 000000000000..d6ab247aea84 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdctest/row.go @@ -0,0 +1,105 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdctest + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// MakeRangeFeedValueReader starts rangefeed on the specified table and returns a function +// that returns the next *roachpb.RangeFeedValue from the table. +// This funciton is intended to be used in tests that wish to read low level roachpb.KeyValue(s). +// Instead of trying to generate KVs ourselves (subject to encoding restrictions, etc), it is +// simpler to just "INSERT ..." into the table, and then use this function to read next value. +func MakeRangeFeedValueReader( + t *testing.T, execCfgI interface{}, desc catalog.TableDescriptor, +) (func(t *testing.T) *roachpb.RangeFeedValue, func()) { + t.Helper() + execCfg := execCfgI.(sql.ExecutorConfig) + rows := make(chan *roachpb.RangeFeedValue) + ctx, cleanup := context.WithCancel(context.Background()) + + _, err := execCfg.RangeFeedFactory.RangeFeed(ctx, "feed-"+desc.GetName(), + []roachpb.Span{desc.PrimaryIndexSpan(keys.SystemSQLCodec)}, + execCfg.Clock.Now(), + func(ctx context.Context, value *roachpb.RangeFeedValue) { + select { + case <-ctx.Done(): + case rows <- value: + } + }, + rangefeed.WithDiff(true), + ) + require.NoError(t, err) + + var timeout = 5 * time.Second + if util.RaceEnabled { + timeout = 3 * timeout + } + + // Helper to read next rangefeed value. + dups := make(map[string]struct{}) + return func(t *testing.T) *roachpb.RangeFeedValue { + t.Helper() + for { + select { + case r := <-rows: + rowKey := r.Key.String() + r.Value.String() + if _, isDup := dups[rowKey]; isDup { + log.Infof(context.Background(), "Skip duplicate %s", roachpb.PrettyPrintKey(nil, r.Key)) + continue + } + log.Infof(context.Background(), "Read row %s", roachpb.PrettyPrintKey(nil, r.Key)) + dups[rowKey] = struct{}{} + return r + case <-time.After(timeout): + t.Fatal("timeout reading row") + return nil + } + } + }, cleanup +} + +// GetHydratedTableDescriptor returns a table descriptor for the specified +// table. The descriptor is "hydrated" if it has user defined data types. +func GetHydratedTableDescriptor( + t *testing.T, execCfgI interface{}, kvDB *kv.DB, tableName tree.Name, +) (td catalog.TableDescriptor) { + t.Helper() + execCfg := execCfgI.(sql.ExecutorConfig) + var found bool + require.NoError(t, sql.DescsTxn(context.Background(), &execCfg, + func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) { + found, td, err = col.GetImmutableTableByName(ctx, txn, + tree.NewTableNameWithSchema("defaultdb", "public", tableName), + tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + Required: true, + AvoidLeased: true, + }, + }) + return err + })) + require.True(t, found) + return td +} diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index cbb9d4b84a43..bac4d2fdc329 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -224,8 +224,8 @@ func TestEncoders(t *testing.T) { } require.NoError(t, err) - rowInsert := cdcevent.TestingMakeEventRow(tableDesc, row, false) - prevRow := cdcevent.TestingMakeEventRow(tableDesc, nil, false) + rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) + prevRow := cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false) evCtx := eventContext{updated: ts} keyInsert, err := e.EncodeKey(context.Background(), rowInsert) @@ -235,8 +235,8 @@ func TestEncoders(t *testing.T) { require.NoError(t, err) require.Equal(t, expected.insert, rowStringFn(keyInsert, valueInsert)) - rowDelete := cdcevent.TestingMakeEventRow(tableDesc, row, true) - prevRow = cdcevent.TestingMakeEventRow(tableDesc, row, false) + rowDelete := cdcevent.TestingMakeEventRow(tableDesc, 0, row, true) + prevRow = cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) keyDelete, err := e.EncodeKey(context.Background(), rowDelete) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { e, err := getEncoder(opts, targets) require.NoError(t, err) - rowInsert := cdcevent.TestingMakeEventRow(tableDesc, row, false) + rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) var prevRow cdcevent.Row evCtx := eventContext{updated: ts} keyInsert, err := e.EncodeKey(context.Background(), rowInsert) @@ -374,8 +374,8 @@ func TestAvroEncoderWithTLS(t *testing.T) { require.NoError(t, err) require.Equal(t, expected.insert, rowStringFn(keyInsert, valueInsert)) - rowDelete := cdcevent.TestingMakeEventRow(tableDesc, row, true) - prevRow = cdcevent.TestingMakeEventRow(tableDesc, row, false) + rowDelete := cdcevent.TestingMakeEventRow(tableDesc, 0, row, true) + prevRow = cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) keyDelete, err := e.EncodeKey(context.Background(), rowDelete) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 1d6bc70f73cb..f67b6ea43d64 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -56,7 +56,6 @@ func newKVEventToRowConsumer( knobs TestingKnobs, topicNamer *TopicNamer, ) *kvEventToRowConsumer { - return &kvEventToRowConsumer{ frontier: frontier, encoder: encoder,