From 8c50ab9094a98252c970d48f9719cfbaa4a163c7 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 23 May 2022 14:26:29 -0400 Subject: [PATCH] changefeedcc: Predicates and projections in CDC. Introduce `cdceval` package -- a library for expression evaluation for CDC. Changefeed users for a long time requested ability to emit only a subset of columns. They have also requested ability to filter out unwanted events (for example, filter out deletions). This library aims to accomplish those goals. However, instead of focusing on a narrow use cases, which would usually be addressed via addition of new `WITH` option (as done in https://github.com/cockroachdb/cockroach/pull/80499), this library aims to provide support for general expression evaluation. `cdceval` library provides the following functionality: * Ability to evaluate predicates (filters) so that events may be filtered. * Ability to evaluate projection expressions (`select *`, `select a, b,c`, or even `select a + b - c as math_column`) * Ability to evaluate virtual compute columns (currently not implemented in this PR). `cdceval` library reuses existing parsing and evaluation libraries, but adopts them for CDC use case. CDC events are row level events, and as such, CDC expressions only make sense in the context of a single row/single table. In addition, because CDC events are at least once semantics, the emitted events must not depend on volatile state. In summary, any expression is supported except: * Volatile functions -- not supported * Stable functions, such as `now()`, `current_timestamp()`, etc are modified so that they return stable values -- namely events MVCC timestamp. * Multi row functions (aggregates, windowing functions) are disallowed. `cdceval` also defined few custom, CDC specific functions, such as: * `cdc_prev()`: Returns the previous row values as a JSONB object. * `cdc_is_delete()`: Returns true if the row was deleted. * Others -- see `functions.go` The follow PRs will add a "front end" to this library to enable creation and management of predicated changefeeds. Release Notes: None --- pkg/BUILD.bazel | 1 + pkg/ccl/changefeedccl/avro_test.go | 22 +- pkg/ccl/changefeedccl/cdceval/BUILD.bazel | 71 ++ pkg/ccl/changefeedccl/cdceval/doc.go | 104 +++ pkg/ccl/changefeedccl/cdceval/expr_eval.go | 635 ++++++++++++++++++ .../changefeedccl/cdceval/expr_eval_test.go | 628 +++++++++++++++++ pkg/ccl/changefeedccl/cdceval/functions.go | 183 +++++ .../changefeedccl/cdceval/functions_test.go | 127 ++++ pkg/ccl/changefeedccl/cdceval/main_test.go | 33 + pkg/ccl/changefeedccl/cdcevent/BUILD.bazel | 11 +- pkg/ccl/changefeedccl/cdcevent/event.go | 123 ++-- pkg/ccl/changefeedccl/cdcevent/event_test.go | 68 +- pkg/ccl/changefeedccl/cdcevent/projection.go | 101 +++ .../changefeedccl/cdcevent/projection_test.go | 109 +++ .../cdcevent/rowfetcher_cache.go | 3 +- pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 9 + pkg/ccl/changefeedccl/cdctest/row.go | 99 +++ pkg/ccl/changefeedccl/encoder_test.go | 14 +- pkg/ccl/changefeedccl/event_processing.go | 1 - 19 files changed, 2218 insertions(+), 124 deletions(-) create mode 100644 pkg/ccl/changefeedccl/cdceval/BUILD.bazel create mode 100644 pkg/ccl/changefeedccl/cdceval/doc.go create mode 100644 pkg/ccl/changefeedccl/cdceval/expr_eval.go create mode 100644 pkg/ccl/changefeedccl/cdceval/expr_eval_test.go create mode 100644 pkg/ccl/changefeedccl/cdceval/functions.go create mode 100644 pkg/ccl/changefeedccl/cdceval/functions_test.go create mode 100644 pkg/ccl/changefeedccl/cdceval/main_test.go create mode 100644 pkg/ccl/changefeedccl/cdcevent/projection.go create mode 100644 pkg/ccl/changefeedccl/cdcevent/projection_test.go create mode 100644 pkg/ccl/changefeedccl/cdctest/row.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4e612aa83d21..2958b53201a5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -13,6 +13,7 @@ ALL_TESTS = [ "//pkg/ccl/backupccl:backupccl_test", "//pkg/ccl/baseccl:baseccl_test", "//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test", + "//pkg/ccl/changefeedccl/cdceval:cdceval_test", "//pkg/ccl/changefeedccl/cdcevent:cdcevent_test", "//pkg/ccl/changefeedccl/cdctest:cdctest_test", "//pkg/ccl/changefeedccl/cdcutils:cdcutils_test", diff --git a/pkg/ccl/changefeedccl/avro_test.go b/pkg/ccl/changefeedccl/avro_test.go index b7ba4e26301e..5f27ce17f24e 100644 --- a/pkg/ccl/changefeedccl/avro_test.go +++ b/pkg/ccl/changefeedccl/avro_test.go @@ -166,7 +166,7 @@ func parseAvroSchema(t *testing.T, j string) (*avroDataRecord, error) { } return tableToAvroSchema( cdcevent.TestingMakeEventRow( - tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), nil, false, + tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), 0, nil, false, ), "", string(changefeedbase.OptVirtualColumnsOmitted)) } @@ -391,7 +391,7 @@ func TestAvroSchema(t *testing.T) { fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.schema)) require.NoError(t, err) origSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(tableDesc, nil, false), + cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) jsonSchema := origSchema.codec.Schema() @@ -405,7 +405,7 @@ func TestAvroSchema(t *testing.T) { require.NoError(t, err) for _, encDatums := range rows { - row := cdcevent.TestingMakeEventRow(tableDesc, encDatums, false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums, false) evalCtx := &eval.Context{ SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}), } @@ -430,7 +430,7 @@ func TestAvroSchema(t *testing.T) { tableDesc, err := parseTableDesc(`CREATE TABLE "☃" (🍦 INT PRIMARY KEY)`) require.NoError(t, err) tableSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(tableDesc, nil, false), avroSchemaNoSuffix, "") + cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) require.Equal(t, `{"type":"record","name":"_u2603_","fields":[`+ @@ -438,7 +438,7 @@ func TestAvroSchema(t *testing.T) { `"__crdb__":"🍦 INT8 NOT NULL"}]}`, tableSchema.codec.Schema()) indexSchema, err := primaryIndexToAvroSchema( - cdcevent.TestingMakeEventRow(tableDesc, nil, false), tableDesc.GetName(), "") + cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), tableDesc.GetName(), "") require.NoError(t, err) require.Equal(t, `{"type":"record","name":"_u2603_","fields":[`+ @@ -661,7 +661,7 @@ func TestAvroSchema(t *testing.T) { encDatums, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`) require.NoError(t, err) - row := cdcevent.TestingMakeEventRow(tableDesc, encDatums[0], false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums[0], false) schema, err := tableToAvroSchema( row, avroSchemaNoSuffix, "") require.NoError(t, err) @@ -719,7 +719,7 @@ func TestAvroSchema(t *testing.T) { encDatums, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`) require.NoError(t, err) - row := cdcevent.TestingMakeEventRow(tableDesc, encDatums[0], false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums[0], false) schema, err := tableToAvroSchema(row, avroSchemaNoSuffix, "") require.NoError(t, err) textual, err := schema.textualFromRow(row) @@ -824,13 +824,13 @@ func TestAvroMigration(t *testing.T) { fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.writerSchema)) require.NoError(t, err) writerSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(writerDesc, nil, false), avroSchemaNoSuffix, "") + cdcevent.TestingMakeEventRow(writerDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) readerDesc, err := parseTableDesc( fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.readerSchema)) require.NoError(t, err) readerSchema, err := tableToAvroSchema( - cdcevent.TestingMakeEventRow(readerDesc, nil, false), avroSchemaNoSuffix, "") + cdcevent.TestingMakeEventRow(readerDesc, 0, nil, false), avroSchemaNoSuffix, "") require.NoError(t, err) writerRows, err := parseValues(writerDesc, `VALUES `+test.writerValues) @@ -839,7 +839,7 @@ func TestAvroMigration(t *testing.T) { require.NoError(t, err) for i := range writerRows { - writerEvent := cdcevent.TestingMakeEventRow(writerDesc, writerRows[i], false) + writerEvent := cdcevent.TestingMakeEventRow(writerDesc, 0, writerRows[i], false) encoded, err := writerSchema.BinaryFromRow(nil, writerEvent.ForEachColumn()) require.NoError(t, err) row, err := rowFromBinaryEvolved(encoded, writerSchema, readerSchema) @@ -907,7 +907,7 @@ func benchmarkEncodeType(b *testing.B, typ *types.T, encRow rowenc.EncDatumRow) tableDesc, err := parseTableDesc( fmt.Sprintf(`CREATE TABLE bench_table (bench_field %s)`, typ.SQLString())) require.NoError(b, err) - row := cdcevent.TestingMakeEventRow(tableDesc, encRow, false) + row := cdcevent.TestingMakeEventRow(tableDesc, 0, encRow, false) schema, err := tableToAvroSchema(row, "suffix", "namespace") require.NoError(b, err) diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel new file mode 100644 index 000000000000..f7128ca9504b --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -0,0 +1,71 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "cdceval", + srcs = [ + "doc.go", + "expr_eval.go", + "functions.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/sql/catalog/colinfo", + "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/builtins", + "//pkg/sql/sem/eval", + "//pkg/sql/sem/normalize", + "//pkg/sql/sem/tree", + "//pkg/sql/sem/volatility", + "//pkg/sql/sessiondata", + "//pkg/sql/sessiondatapb", + "//pkg/sql/types", + "//pkg/util/hlc", + "//pkg/util/json", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "cdceval_test", + srcs = [ + "expr_eval_test.go", + "functions_test.go", + "main_test.go", + ], + embed = [":cdceval"], + deps = [ + "//pkg/base", + "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/ccl/changefeedccl/cdctest", + "//pkg/ccl/utilccl", + "//pkg/jobs/jobspb", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/distsql", + "//pkg/sql/parser", + "//pkg/sql/randgen", + "//pkg/sql/rowenc", + "//pkg/sql/sem/eval", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondatapb", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/json", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/changefeedccl/cdceval/doc.go b/pkg/ccl/changefeedccl/cdceval/doc.go new file mode 100644 index 000000000000..c044bcc720e9 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/doc.go @@ -0,0 +1,104 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +/*** + +cdceval package is a library for evaluating various expressions in CDC. +Namely, this package concerns itself with 3 things: + * Filter evaluation -- aka predicates: does the event match boolean expression. + * Projection evaluation: given the set of projection expressions, evaluate them. + * (Soon to be added) Evaluation of computed virtual columns. + +Evaluator is the gateway into the evaluation logic; it has 3 methods matching +the above use cases. Before filtering and projection can be used, Evaluator must +be configured with appropriate predicate and filtering expressions via ConfigurePredicates. + +If the Evaluator is not configured with ConfigurePredicates, then each event is assumed +to match filter by default, and projection operation is an identity operation returning input +row. + +Evaluator constructs a helper structure (exprEval) to perform actual evaluation. +One exprEval exists per cdcevent.EventDescriptor (currently, a new exprEval created +whenever event descriptor changes; we might have to add some sort of caching if needed). + +Evaluation of projections and filter expressions are identical. + +First, we have "compilation" phase: + 1. Expression is "walked" to resolve the names and replace those names with tree.IndexedVar expressions. + The "index" part of the indexed var refers to the "index" of the datum in the row. + (note however: the row is abstracted under cdcevent package). IndexedVar allows the value of that + variable to be bound later once it is known; it also associates the type information + with that variable. + 2. Expression is typed check to ensure that it is of the appropriate type. + * Projection expressions can be of tree.Any type, while filters must be tree.DBool. + 3. Expression is then normalized (constants folded, etc). + +It is an error to have a filter expression which evaluates to "false" -- in this case, Evaluator +will return a "contradiction" error. + +After expressions "compiled", they can be evaluated; and again, both projections and filters use the same +logic (evalExpr() function); basically, all IndexedVars are bound to the Datums in the updated row, and the +expression is evaluated to the appropriate target type. + +Expressions can contain functions. We restrict the set of functions that can be used by CDC. +Volatile functions, window functions, aggregate functions are disallowed. +Certain stable functions (s.a. now(), current_timestamp(), etc) are allowed -- they will always +return the MVCC timestamp of the event. +We also provide custom, CDC specific functions, such as cdc_prev() which returns prevoius row as +a JSONB record. See functions.go for more details. + +***/ + +// TODO(yevgeniy): Various notes/questions/issues and todos. +// 1. Options issues: +// * key_in_value: makes no sense; just "select *" +// * key_only: currently unsupported by this flavor; would be nice to support it though +// i.e. you only want the key, but you need "where" clause filtering. Not clear how to express in sql.y +// * VirtualColumnVisibility: null or omit -- both can be accomplished +// * null: currently emitting null, but you can choose to emit null via "select ... null as vcolumn" +// * omit: well, just don't select. +// * On the other hand, we can also support "emit" mode, where we can compute vcolumn expression. +// * updated and mvcc_timestamp options -- both supported via select +// * Wrapped option -- does it make sense here. +// 3. Probably need to add more custom functions. +// * Determine what to do with stable function overrides (now()) vs cdc_mvcc_timestamp. Keep both? drop one? +// 4. How to surface previous row -- it's an open question. +// * Option 1: provide cdc_prev() builtin which returns JSON encoding of previous row. +// One of the negatives is that we are adding an additional JSONB encoding cost, though, this may not +// be that horrible. One interesting thing we could do with this approach is to also have a function +// cdc_delta which reduces JSONB to contain only modified columns (cdc_delta(cdc_prev()). +// Of course you could do something like this with "prev" table, but you'd have to "(case ...)" select +// for each table column. +// And since composition is so cool, you could use cdc_delta to determine if an update is not actually +// and update, but an upsert event. +// * Option 2: provide "prev" table. Also has negatives. Name resolution becomes more complex. You could +// legitimately have "prev" table, so you'd always need to alias the "real prev" table. The prev table +// is not specified via sql.y, so that's confusing. +// * Regardless of the option, keep in mind that sometimes prev is not set -- e.g. w/out diff option +// (here, we can return an error), but also not set during initial scan. So, the query must handle +// nulls in prev value. Just something to keep in mind. +// 5. We must be able to return permanent errors from this code that cause changefeed to fail. +// If filtering fails for a row (e.g. "select ... where col_a/col_b > 0" results in divide by 0), +// this will fail forever, and so we must be able to return permanent error. +// 6. Related to 5, we must have poison message handling so we won't kill feed in cases like that. +// 7. Schema changes could cause permanent failures. +// 8. Multiple *'s are allowed. But should they? +// 9. It is interesting to consider what access to prev does when we then send that data to encoder. +// Right now, we hard code before/after datums; with predicates, we should probably change how things are encoded. +// I.e. no "before"/"after" fields in json/avro -- instead, you select what you want to select. +// 10. Multi family support -- sort of breaks down because you get datums only for 1 family at a time. Any expressions +// comparing columns across families will fail. +// 11. Span constraints -- arguably the "holy grail" -- something that depends on the optiizer, but perhaps we +// can find a way of using that w/out significant refactor to expose entirety of changefeed to opt. +// Basically, given the set of predicates over primary key span, try to narrow the span(s) to those that can +// satisfy predicates. +// 12. UI/Usability: Simple contradictions are detected -- but not all. Even w/out contradiction, the user +// may want to know which events match/not match, and how does the data look like. We might need a mode +// where the data always emitted, but it is marked somehow, indicating how the data will be handled. diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go new file mode 100644 index 000000000000..9d8596b38e84 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -0,0 +1,635 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/normalize" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// Evaluator is a responsible for evaluating expressions in CDC. +type Evaluator struct { + selectors []tree.SelectExpr + where tree.Expr + + evalCtx *eval.Context + // Current evaluator. Re-initialized whenever event descriptor + // version changes. + evaluator *exprEval +} + +// NewEvaluator returns new evaluator intance. +func NewEvaluator(evalCtx *eval.Context) Evaluator { + return Evaluator{evalCtx: evalCtx.Copy()} +} + +// ConfigurePredicates configures this evaluator to evaluate selectors and predicates +// as specified by the select statement. +func (e *Evaluator) ConfigurePredicates(selectStmt *tree.Select) error { + sc, ok := selectStmt.Select.(*tree.SelectClause) + if !ok { + return pgerror.Newf(pgcode.InvalidParameterValue, + "expected tree.SelectClause, found %T", selectStmt.Select) + } + + if len(sc.Exprs) == 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "expected at least 1 select expression in select statement %s", selectStmt) + } + + if sc.Where != nil && sc.Where.Type != tree.AstWhere { + return pgerror.Newf(pgcode.InvalidParameterValue, + "expected WHERE clause, found HAVING in %s", selectStmt) + } + + e.selectors = sc.Exprs + if sc.Where != nil { + e.where = sc.Where.Expr + } + return nil +} + +// ComputeVirtualColumns updates row with computed values for all virtual columns. +func (e *Evaluator) ComputeVirtualColumns(ctx context.Context, row *cdcevent.Row) error { + return errors.AssertionFailedf("unimplemented yet") +} + +// MatchesFilter returns true if row matches evaluator filter expression. +func (e *Evaluator) MatchesFilter( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (bool, error) { + if e.where == nil { + return true, nil + } + + if err := e.initEval(ctx, updatedRow.EventDescriptor); err != nil { + return false, err + } + + return e.evaluator.matchesFilter(ctx, updatedRow, mvccTS, prevRow) +} + +// Projection performs evalProjection operation on the updated row. +// mvccTS is an mvcc timestamp of updated row, and prevRow may optionally contain +// the value of the previous row. +// Returns cdcevent.Row representing evalProjection. +func (e *Evaluator) Projection( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (cdcevent.Row, error) { + if len(e.selectors) == 0 { + return updatedRow, nil + } + + if err := e.initEval(ctx, updatedRow.EventDescriptor); err != nil { + return cdcevent.Row{}, err + } + + return e.evaluator.evalProjection(ctx, updatedRow, mvccTS, prevRow) +} + +// initEval initializes evaluator for the specified event descriptor. +func (e *Evaluator) initEval(ctx context.Context, d *cdcevent.EventDescriptor) error { + if e.evaluator != nil && d.Equals(e.evaluator.EventDescriptor) { + return nil + } + + evaluator := newExprEval(e.evalCtx, d) + for _, selector := range e.selectors { + if err := evaluator.addSelector(ctx, selector, len(e.selectors)); err != nil { + return err + } + } + + if err := evaluator.addFilter(ctx, e.where); err != nil { + return err + } + + e.evaluator = evaluator + return nil +} + +type exprEval struct { + *cdcevent.EventDescriptor + semaCtx tree.SemaContext + evalCtx *eval.Context + + evalHelper *rowContainer // evalHelper is a container tree.IndexedVarContainer. + iVarHelper tree.IndexedVarHelper // iVarHelper helps create indexed variables bound to evalHelper. + resolver cdcNameResolver // resolver responsible for performing function name resolution. + + starProjection bool + selectors []tree.TypedExpr // set of expressions to evaluate when performing evalProjection. + projection cdcevent.Projection + filter tree.TypedExpr // where clause filter + + // keep track of number of times particular column name was used + // in selectors. Since the data produced by CDC gets converted + // to the formats (JSON, avro, etc.) that may not like having multiple + // fields named the same way, this map helps us unique-ify those columns. + nameUseCount map[string]int + + // rowEvalCtx contains state necessary to evaluate expressions. + // updated for each row. + rowEvalCtx rowEvalContext +} + +func newExprEval(evalCtx *eval.Context, ed *cdcevent.EventDescriptor) *exprEval { + cols := ed.ResultColumns() + e := &exprEval{ + EventDescriptor: ed, + semaCtx: tree.MakeSemaContext(), + evalCtx: evalCtx.Copy(), + evalHelper: &rowContainer{cols: cols}, + projection: cdcevent.MakeProjection(ed), + nameUseCount: make(map[string]int), + } + + evalCtx = nil // From this point, only e.evalCtx should be used. + + // Configure semantic context. + e.semaCtx.SearchPath = &cdcCustomFunctionResolver{SearchPath: sessiondata.DefaultSearchPath} + e.semaCtx.Properties.Require("cdc", + tree.RejectAggregates|tree.RejectGenerators|tree.RejectWindowApplications|tree.RejectNestedGenerators, + ) + e.semaCtx.Annotations = tree.MakeAnnotations(cdcAnnotationAddr) + e.semaCtx.IVarContainer = e.evalHelper + + // Configure evaluation context. + e.evalCtx.Annotations = &e.semaCtx.Annotations + e.evalCtx.Annotations.Set(cdcAnnotationAddr, &e.rowEvalCtx) + e.evalCtx.IVarContainer = e.evalHelper + + // Extract colinfo.ResultColumn from cdcevent.ResultColumn + nakedResultColumns := func() (rc []colinfo.ResultColumn) { + rc = make([]colinfo.ResultColumn, len(cols)) + for i := 0; i < len(cols); i++ { + rc[i] = cols[i].ResultColumn + } + return rc + } + + e.iVarHelper = tree.MakeIndexedVarHelper(e.evalHelper, len(cols)) + e.resolver = cdcNameResolver{ + EventDescriptor: ed, + NameResolutionVisitor: schemaexpr.MakeNameResolutionVisitor( + colinfo.NewSourceInfoForSingleTable( + tree.MakeUnqualifiedTableName(tree.Name(ed.TableName)), + nakedResultColumns(), + ), + e.iVarHelper, + ), + } + + return e +} + +// rowEvalContext represents the context needed to evaluate row expressions. +type rowEvalContext struct { + mvccTS hlc.Timestamp + updatedRow cdcevent.Row + prevRow cdcevent.Row + memo struct { + prevJSON tree.Datum + } +} + +// setupContext configures evaluation context with the provided row information. +func (e *exprEval) setupContext( + updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) { + e.rowEvalCtx.updatedRow = updatedRow + e.rowEvalCtx.prevRow = prevRow + e.rowEvalCtx.mvccTS = mvccTS + e.evalCtx.TxnTimestamp = mvccTS.GoTime() + + // Clear out all memo records + e.rowEvalCtx.memo.prevJSON = nil +} + +// evalProjection responsible for evaluating projection expression. +// Returns new projection Row. +func (e *exprEval) evalProjection( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (cdcevent.Row, error) { + if e.starProjection { + return updatedRow, nil + } + + e.setupContext(updatedRow, mvccTS, prevRow) + + for i, expr := range e.selectors { + d, err := e.evalExpr(ctx, expr, types.Any) + if err != nil { + return cdcevent.Row{}, err + } + log.Infof(ctx, "selector %d: %s evaluated to %s", i, expr, d) + if err := e.projection.SetValueDatumAt(i, d); err != nil { + return cdcevent.Row{}, err + } + } + + return e.projection.Project(updatedRow) +} + +// matchesFilter returns true if row matches configured filter. +func (e *exprEval) matchesFilter( + ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, +) (bool, error) { + if e.filter == nil { + return true, nil + } + + e.setupContext(updatedRow, mvccTS, prevRow) + d, err := e.evalExpr(ctx, e.filter, types.Bool) + if err != nil { + return false, err + } + return d == tree.DBoolTrue, nil +} + +// computeRenderColumnName returns render name for a selector, adjusted for CDC use case. +func (e *exprEval) computeRenderColumnName(selector tree.SelectExpr) (string, error) { + as, err := func() (string, error) { + if selector.As != "" { + return string(selector.As), nil + } + // We use ComputeColNameInternal instead of GetRenderName because the latter, if it can't + // figure out the name, returns "?column?" as the name; but we want to name things slightly + // different in that case. + _, s, err := tree.ComputeColNameInternal(e.semaCtx.SearchPath, selector.Expr) + return s, err + }() + if err != nil { + return "", err + } + + if as == "" { + as = fmt.Sprintf("column_%d", 1+len(e.selectors)) + } + return e.makeUniqueName(as), nil +} + +// makeUniqueName returns a unique name for the specified name. +// We do this because seeing same named fields in JSON might be confusing. +func (e *exprEval) makeUniqueName(as string) string { + useCount := e.nameUseCount[as] + e.nameUseCount[as]++ + if useCount > 0 { + // Unique-ify evalProjection name. + as = fmt.Sprintf("%s_%d", as, useCount) + } + return as +} + +// addSelector adds specified select expression to evalProjection set. +func (e *exprEval) addSelector( + ctx context.Context, selector tree.SelectExpr, numSelectors int, +) error { + as, err := e.computeRenderColumnName(selector) + if err != nil { + return err + } + + typedExpr, err := e.typeCheck(ctx, selector.Expr, types.Any) + if err != nil { + return err + } + + // Expand "*". We walked expression during type check above, so we only expect to + // see UnqualifiedStar. + if _, isStar := typedExpr.(tree.UnqualifiedStar); isStar { + if numSelectors == 1 { + // Single star gets special treatment. + e.starProjection = true + } else { + for ord, col := range e.ResultColumns() { + e.addProjection(e.iVarHelper.IndexedVar(ord), e.makeUniqueName(col.Name)) + } + } + } else { + e.addProjection(typedExpr, as) + } + + return nil +} + +// addFilter adds where clause filter. +func (e *exprEval) addFilter(ctx context.Context, where tree.Expr) error { + if where == nil { + return nil + } + typedExpr, err := e.typeCheck(ctx, where, types.Bool) + if err != nil { + return err + } + + if typedExpr == tree.DBoolTrue { + if log.V(1) { + log.Infof(ctx, "ignoring tautological filter %q", where) + } + return nil + } + + if typedExpr == tree.DBoolFalse { + return errors.Newf("filter %q is a contradiction", where) + } + + e.filter = typedExpr + return nil +} + +// addProjection adds expression to be returned by evalProjection. +func (e *exprEval) addProjection(expr tree.TypedExpr, as string) { + e.selectors = append(e.selectors, expr) + e.projection.AddValueColumn(as, expr.ResolvedType()) +} + +// typeCheck converts expression to the expression of specified target type. +func (e *exprEval) typeCheck( + ctx context.Context, expr tree.Expr, targetType *types.T, +) (tree.TypedExpr, error) { + // If we have variable free immutable expressions, then we can just evaluate it right away. + typedExpr, err := schemaexpr.SanitizeVarFreeExpr( + ctx, expr, targetType, "cdc", &e.semaCtx, volatility.Immutable) + if err == nil { + d, err := eval.Expr(e.evalCtx, typedExpr) + if err != nil { + return nil, err + } + return d, nil + } + + // We must work harder. Bind variables and resolve names. + expr, _ = tree.WalkExpr(&e.resolver, expr) + if e.resolver.err != nil { + return nil, e.resolver.err + } + + if star, isStar := expr.(tree.UnqualifiedStar); isStar { + // Can't type check star -- we'll handle it later during eval. + return star, nil + } + + // Run type check & normalize. + typedExpr, err = expr.TypeCheck(ctx, &e.semaCtx, targetType) + if err != nil { + return nil, err + } + return normalize.Expr(e.evalCtx, typedExpr) +} + +// evalExpr evaluates typed expression and returns resulting datum. +// must be called after setupContext has been called. +func (e *exprEval) evalExpr( + ctx context.Context, expr tree.TypedExpr, targetType *types.T, +) (tree.Datum, error) { + switch t := expr.(type) { + case tree.Datum: + return t, nil + case *tree.IndexedVar: + d, err := e.rowEvalCtx.updatedRow.DatumAt(t.Idx) + if err != nil { + return nil, err + } + return d, nil + default: + v := replaceIndexVarVisitor{row: e.rowEvalCtx.updatedRow} + newExpr, _ := tree.WalkExpr(&v, expr) + if v.err != nil { + return nil, v.err + } + + typedExpr, err := tree.TypeCheck(ctx, newExpr, &e.semaCtx, targetType) + if err != nil { + return nil, err + } + d, err := eval.Expr(e.evalCtx, typedExpr) + if err != nil { + return nil, err + } + return d, nil + } +} + +type cdcNameResolver struct { + schemaexpr.NameResolutionVisitor + *cdcevent.EventDescriptor + err error +} + +// tag errors generated by cdcNameResolver. +type cdcResolverError struct { + error +} + +func (v *cdcNameResolver) wrapError() func() { + // NameResolutionVisitor returns "column X does not exist" error if expression references + // column that was not configured. This is a bit confusing for CDC since a column + // may exist in the table, but not be available for a particular family. So, annotate + // the error to make it more obvious. + // We only want to do this for errors returned by NameResolutionVisitor, and not errors + // that we generate ourselves. + if v.err == nil { + return func() { + if v.NameResolutionVisitor.Err() != nil && v.err == nil { + v.err = errors.WithHintf(v.Err(), + "object does not exist in table %q, family %q", v.TableName, v.FamilyName) + } + } + } + return func() {} +} + +// VisitPre implements tree.Visitor interface. +func (v *cdcNameResolver) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) { + defer v.wrapError()() + recurse, newExpr = v.NameResolutionVisitor.VisitPre(expr) + return v.err == nil, newExpr +} + +// VisitPost implements tree.Visitor interface. +func (v *cdcNameResolver) VisitPost(expr tree.Expr) (newExpr tree.Expr) { + defer v.wrapError()() + v.NameResolutionVisitor.VisitPost(expr) + + switch t := expr.(type) { + case *tree.AllColumnsSelector: + // AllColumnsSelector occurs when "x.*" is used. We have a simple 1 table support, + // so make sure table names match. + if t.TableName.String() != v.TableName { + v.err = &cdcResolverError{ + error: pgerror.Newf(pgcode.UndefinedTable, "no data source matches pattern: %s", t.String()), + } + return t + } + // Now that we know table names match, turn this into unqualified star. + return tree.UnqualifiedStar{} + case *tree.FuncExpr: + fn, err := v.checkFunctionSupported(t) + if err != nil { + v.err = err + return expr + } + return fn + default: + return expr + } +} + +func (v *cdcNameResolver) resolveCustomCDCFunction( + name string, fnCall *tree.FuncExpr, +) *tree.FuncExpr { + fn, exists := cdcFunctions[name] + if !exists { + return nil + } + return &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{FunctionReference: fn}, + Type: fnCall.Type, + Exprs: fnCall.Exprs, + } +} + +func (v *cdcNameResolver) checkFunctionSupported(fnCall *tree.FuncExpr) (*tree.FuncExpr, error) { + var fnName string + var fnClass tree.FunctionClass + var fnVolatility volatility.V + + unsupportedFunctionErr := func() error { + if fnName == "" { + fnName = fnCall.Func.String() + } + return &cdcResolverError{ + error: pgerror.Newf(pgcode.UndefinedFunction, "function %q unsupported by CDC", fnName), + } + } + + switch fn := fnCall.Func.FunctionReference.(type) { + case *tree.UnresolvedName: + // We may not have function definition yet if function takes arguments, + // or it's one of the custom cdc functions. + fnName = fn.String() + props, overloads := builtins.GetBuiltinProperties(fn.String()) + if props == nil { + if custom := v.resolveCustomCDCFunction(fnName, fnCall); custom != nil { + return custom, nil + } + return nil, unsupportedFunctionErr() + } + fnClass = props.Class + // Pick highest volatility overload. + for _, o := range overloads { + if o.Volatility > fnVolatility { + fnVolatility = o.Volatility + } + } + case *tree.FunctionDefinition: + fnName, fnClass = fn.Name, fn.Class + if fnCall.ResolvedOverload() != nil { + if _, isCDC := cdcFunctions[fnName]; isCDC { + return fnCall, nil + } + fnVolatility = fnCall.ResolvedOverload().Volatility + } else { + // Pick highest volatility overload. + for _, o := range fn.Definition { + overload := o.(*tree.Overload) + if overload.Volatility > fnVolatility { + fnVolatility = overload.Volatility + } + } + } + default: + return nil, errors.AssertionFailedf("unexpected function expression of type %T", fn) + } + + // Aggregates, generators and window functions are not supported. + switch fnClass { + case tree.AggregateClass, tree.GeneratorClass, tree.WindowClass: + return nil, unsupportedFunctionErr() + } + + if fnVolatility <= volatility.Immutable { + // Remaining immutable functions are safe. + return fnCall, nil + } + + // We have a non-immutable function -- make sure it is supported. + _, isSafe := supportedVolatileBuiltinFunctions[fnName] + if !isSafe { + return nil, unsupportedFunctionErr() + } + return fnCall, nil +} + +// rowContainer is a structure to assist with evaluation of CDC expressions. +type rowContainer struct { + cols []cdcevent.ResultColumn +} + +var _ tree.IndexedVarContainer = (*rowContainer)(nil) + +// IndexedVarResolvedType implements tree.IndexedVarContainer +func (c *rowContainer) IndexedVarResolvedType(idx int) *types.T { + return c.cols[idx].Typ +} + +// IndexedVarNodeFormatter implements tree.IndexedVarContainer +func (c *rowContainer) IndexedVarNodeFormatter(idx int) tree.NodeFormatter { + return nil +} + +type replaceIndexVarVisitor struct { + row cdcevent.Row + err error +} + +var _ tree.Visitor = (*replaceIndexVarVisitor)(nil) + +// VisitPre implements tree.Visitor interface. +func (v *replaceIndexVarVisitor) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) { + if iVar, ok := expr.(*tree.IndexedVar); ok { + datum, err := v.row.DatumAt(iVar.Idx) + if err != nil { + v.err = pgerror.Wrapf(err, pgcode.NumericValueOutOfRange, "variable @%d out of bounds", iVar.Idx) + return false, expr + } + return true, datum + } + return true, expr +} + +// VisitPost implements tree.Visitor interface. +func (v *replaceIndexVarVisitor) VisitPost(expr tree.Expr) (newNode tree.Expr) { + return expr +} + +// cdcAnnotationAddr is the address used to store relevant information +// in the Annotation field of evalCtx when evaluating expressions. +const cdcAnnotationAddr tree.AnnotationIdx = iota + 1 + +func rowEvalContextFromEvalContext(evalCtx *eval.Context) *rowEvalContext { + return evalCtx.Annotations.Get(cdcAnnotationAddr).(*rowEvalContext) +} diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go new file mode 100644 index 000000000000..3024e6adef31 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -0,0 +1,628 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/distsql" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +func TestNoopPredicate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, + "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, FAMILY most (a,b,c), FAMILY only_d (d))") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + + serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() + decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, + jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, + TableID: desc.GetID(), + FamilyName: "most", + }, + }, + }) + + popRow := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc) + sqlDB.Exec(t, "INSERT INTO foo (a, b, d) VALUES (1, 'one', -1)") + testRow := decodeRow(t, decoder, popRow(t), false) + + e := makeEvaluator(t, s.ClusterSettings(), "") + matches, err := e.MatchesFilter(ctx, testRow, hlc.Timestamp{}, testRow) + require.NoError(t, err) + require.True(t, matches) + + projection, err := e.Projection(ctx, testRow, hlc.Timestamp{}, testRow) + require.NoError(t, err) + require.Equal(t, testRow.EventDescriptor, projection.EventDescriptor) +} + +func TestEvaluator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + d STRING AS (concat(b, c)) VIRTUAL, + e status DEFAULT 'inactive', + PRIMARY KEY (b, a), + FAMILY main (a, b, e), + FAMILY only_c (c) +)`) + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + popRow := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc) + + type decodeExpectation struct { + expectUnwatchedErr bool + projectionErr string + + // current value expectations. + expectFiltered bool + keyValues []string + allValues map[string]string + } + + repeatExpectation := func(e decodeExpectation, n int) (repeated []decodeExpectation) { + for i := 0; i < n; i++ { + repeated = append(repeated, e) + } + return + } + + for _, tc := range []struct { + testName string + familyName string // Must be set if targetType ChangefeedTargetSpecification_COLUMN_FAMILY + actions []string + predicate string + + expectMainFamily []decodeExpectation + expectOnlyCFamily []decodeExpectation + }{ + { + testName: "main/star", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, '1st test')"}, + predicate: "SELECT * FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"1st test", "1"}, + allValues: map[string]string{"a": "1", "b": "1st test", "e": "inactive"}, + }, + }, + }, + { + testName: "main/qualified_star", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'qualified')"}, + predicate: "SELECT foo.* FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"qualified", "1"}, + allValues: map[string]string{"a": "1", "b": "qualified", "e": "inactive"}, + }, + }, + }, + { + testName: "main/star_delete", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b) VALUES (2, '2nd test')", + "DELETE FROM foo WHERE a=2 AND b='2nd test'", + }, + predicate: "SELECT *, cdc_is_delete() FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"2nd test", "2"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"}, + }, + { + keyValues: []string{"2nd test", "2"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"}, + }, + }, + }, + { + testName: "main/projection", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (3, '3rd test')"}, + predicate: "SELECT e, a FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"3rd test", "3"}, + allValues: map[string]string{"a": "3", "e": "inactive"}, + }, + }, + }, + { + testName: "main/not_closed", + familyName: "main", + actions: []string{ + "INSERT INTO foo (a, b, e) VALUES (1, '4th test', 'closed')", + "INSERT INTO foo (a, b, e) VALUES (2, '4th test', 'open')", + "INSERT INTO foo (a, b, e) VALUES (3, '4th test', 'closed')", + "INSERT INTO foo (a, b, e) VALUES (4, '4th test', 'closed')", + "INSERT INTO foo (a, b, e) VALUES (5, '4th test', 'inactive')", + }, + predicate: "SELECT a FROM _ WHERE e IN ('open', 'inactive')", + expectMainFamily: []decodeExpectation{ + { + expectFiltered: true, + keyValues: []string{"4th test", "1"}, + }, + { + keyValues: []string{"4th test", "2"}, + allValues: map[string]string{"a": "2"}, + }, + { + expectFiltered: true, + keyValues: []string{"4th test", "3"}, + }, + { + expectFiltered: true, + keyValues: []string{"4th test", "4"}, + }, + { + keyValues: []string{"4th test", "5"}, + allValues: map[string]string{"a": "5"}, + }, + }, + }, + { + testName: "main/same_column_many_times", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, '5th test')"}, + predicate: "SELECT *, a, a as one_more, a FROM _", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"5th test", "1"}, + allValues: map[string]string{ + "a": "1", "b": "5th test", "e": "inactive", + "a_1": "1", "one_more": "1", "a_2": "1", + }, + }, + }, + }, + { + testName: "main/no_col_c", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'no_c')"}, + predicate: "SELECT *, c FROM _", + expectMainFamily: []decodeExpectation{ + { + projectionErr: `column "c" does not exist`, + keyValues: []string{"no_c", "1"}, + }, + }, + }, + { + testName: "main/non_primary_family_with_var_free", + familyName: "only_c", + actions: []string{"INSERT INTO foo (a, b, c) VALUES (42, '6th test', 'c value')"}, + predicate: "SELECT sin(pi()/2) AS var_free, c, b ", + expectMainFamily: []decodeExpectation{ + { + expectUnwatchedErr: true, + }, + }, + expectOnlyCFamily: []decodeExpectation{ + { + keyValues: []string{"6th test", "42"}, + allValues: map[string]string{"b": "6th test", "c": "c value", "var_free": "1.0"}, + }, + }, + }, + { + testName: "main/contradiction", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'contradiction')"}, + predicate: "SELECT * FROM _ WHERE 1 > 2", + expectMainFamily: []decodeExpectation{ + { + projectionErr: `filter .* is a contradiction`, + keyValues: []string{"contradiction", "1"}, + }, + }, + }, + { + testName: "main/filter_many", + familyName: "only_c", + actions: []string{ + "INSERT INTO foo (a, b, c) WITH s AS " + + "(SELECT generate_series as x FROM generate_series(1, 100)) " + + "SELECT x, 'filter_many', x::string FROM s", + }, + predicate: "SELECT * FROM _ WHERE a % 33 = 0", + expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100), + expectOnlyCFamily: func() (expectations []decodeExpectation) { + for i := 1; i <= 100; i++ { + iStr := strconv.FormatInt(int64(i), 10) + e := decodeExpectation{ + keyValues: []string{"filter_many", iStr}, + } + if i%33 == 0 { + e.allValues = map[string]string{"c": iStr} + } else { + e.expectFiltered = true + } + expectations = append(expectations, e) + } + return expectations + }(), + }, + { + testName: "main/only_some_deleted_values", + familyName: "only_c", + actions: []string{ + // Insert + "INSERT INTO foo (a, b, c) WITH s AS " + + "(SELECT generate_series as x FROM generate_series(1, 100)) " + + "SELECT x, 'only_some_deleted_values', x::string FROM s", + "DELETE FROM foo WHERE b='only_some_deleted_values'", + }, + predicate: `SELECT * FROM _ WHERE cdc_is_delete() AND cast(cdc_prev()->>'a' as int) % 33 = 0`, + expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 200), + expectOnlyCFamily: func() (expectations []decodeExpectation) { + var delExpectations []decodeExpectation + for i := 1; i <= 100; i++ { + iStr := strconv.FormatInt(int64(i), 10) + e := decodeExpectation{ + keyValues: []string{"only_some_deleted_values", iStr}, + expectFiltered: true, + } + expectations = append(expectations, e) + e.expectFiltered = i%33 != 0 + e.allValues = map[string]string{"c": "NULL"} + delExpectations = append(delExpectations, e) + } + return append(expectations, delExpectations...) + }(), + }, + } { + t.Run(tc.testName, func(t *testing.T) { + targetType := jobspb.ChangefeedTargetSpecification_EACH_FAMILY + if tc.familyName != "" { + targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY + } + + details := jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: targetType, + TableID: desc.GetID(), + FamilyName: tc.familyName, + }, + }, + } + + for _, action := range tc.actions { + sqlDB.Exec(t, action) + } + + serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() + decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, details) + expectedEvents := len(tc.expectMainFamily) + len(tc.expectOnlyCFamily) + evaluator := makeEvaluator(t, s.ClusterSettings(), tc.predicate) + + for i := 0; i < expectedEvents; i++ { + v := popRow(t) + + eventFamilyID, err := cdcevent.TestingGetFamilyIDFromKey(decoder, v.Key, v.Timestamp()) + require.NoError(t, err) + + var expect decodeExpectation + if eventFamilyID == 0 { + expect, tc.expectMainFamily = tc.expectMainFamily[0], tc.expectMainFamily[1:] + } else { + expect, tc.expectOnlyCFamily = tc.expectOnlyCFamily[0], tc.expectOnlyCFamily[1:] + } + + updatedRow, err := decodeRowErr(decoder, v, false) + if expect.expectUnwatchedErr { + require.ErrorIs(t, err, cdcevent.ErrUnwatchedFamily) + continue + } + + require.NoError(t, err) + require.True(t, updatedRow.IsInitialized()) + prevRow := decodeRow(t, decoder, v, true) + require.NoError(t, err) + + if expect.expectFiltered { + require.Equal(t, expect.keyValues, slurpKeys(t, updatedRow)) + matches, err := evaluator.MatchesFilter(ctx, updatedRow, v.Timestamp(), prevRow) + require.NoError(t, err) + require.False(t, matches, "keys: %v", slurpKeys(t, updatedRow)) + continue + } + + projection, err := evaluator.Projection(ctx, updatedRow, v.Timestamp(), prevRow) + if expect.projectionErr != "" { + require.Regexp(t, expect.projectionErr, err) + // Sanity check we get error for the row we expected to get an error for. + require.Equal(t, expect.keyValues, slurpKeys(t, updatedRow)) + } else { + require.NoError(t, err) + require.Equal(t, expect.keyValues, slurpKeys(t, projection)) + require.Equal(t, expect.allValues, slurpValues(t, projection)) + } + } + }) + } +} + +// Tests that use of volatile functions, without CDC specific override, +// results in an error. +func TestUnsupportedCDCFunctions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + testRow := cdcevent.TestingMakeEventRow(desc, 0, nil, false) + ctx := context.Background() + + for fnCall, errFn := range map[string]string{ + // Some volatile functions. + "version()": "version", + "crdb_internal.trace_id()": "crdb_internal.trace_id", + "crdb_internal.locality_value('blah')": "crdb_internal.locality_value", + "1 + crdb_internal.trace_id()": "crdb_internal.trace_id", + "current_user()": "current_user", + "nextval('seq')": "nextval", + + // Special form of CURRENT_USER() is SESSION_USER (no parens). + "SESSION_USER": "session_user", + + // Aggregator functions + "generate_series(1, 10)": "generate_series", + + // Unsupported functions that take arguments from foo. + "generate_series(1, a)": "generate_series", + "crdb_internal.read_file(b)": "crdb_internal.read_file", + } { + t.Run(fmt.Sprintf("select/%s", errFn), func(t *testing.T) { + evaluator := makeEvaluator(t, s.ClusterSettings(), fmt.Sprintf("SELECT %s", fnCall)) + _, err := evaluator.Projection(ctx, testRow, hlc.Timestamp{}, testRow) + require.Regexp(t, fmt.Sprintf(`function "%s" unsupported by CDC`, errFn), err) + }) + + // Same thing, but with the WHERE clause + t.Run(fmt.Sprintf("where/%s", errFn), func(t *testing.T) { + evaluator := makeEvaluator(t, s.ClusterSettings(), + fmt.Sprintf("SELECT 1 WHERE %s IS NOT NULL", fnCall)) + _, err := evaluator.Projection(ctx, testRow, hlc.Timestamp{}, testRow) + require.Regexp(t, fmt.Sprintf(`function "%s" unsupported by CDC`, errFn), err) + }) + } +} + +func TestEvaluatesProjection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, ""+ + "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, "+ + "FAMILY most (a,b,c), FAMILY only_d (d))") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) + + verifyConstantsFolded := func(p *exprEval) { + for _, expr := range p.selectors { + _ = expr.(tree.Datum) + } + } + + for _, tc := range []struct { + name string + predicate string + input rowenc.EncDatumRow + expectErr string + expectation map[string]string + verifyFold bool + }{ + { + name: "constants", + predicate: "SELECT 1, 2, 3", + expectation: map[string]string{"column_1": "1", "column_2": "2", "column_3": "3"}, + verifyFold: true, + }, + { + name: "constants_functions_and_aliases", + predicate: "SELECT 0 as zero, abs(-2) two, 42", + expectation: map[string]string{"zero": "0", "two": "2", "column_3": "42"}, + verifyFold: true, + }, + { + name: "trig_fun", + predicate: "SELECT cos(0), sin(pi()/2) as sin_90, 39 + pi()::int", + expectation: map[string]string{"cos": "1.0", "sin_90": "1.0", "column_3": "42"}, + verifyFold: true, + }, + { + name: "div_by_zero", + predicate: "SELECT 3 / sin(pi() - pi()) as result", + expectErr: "division by zero", + }, + { + name: "projection_with_bound_vars", + predicate: "SELECT sqrt(a::float) + sin(pi()/2) as result, foo.*", + input: makeEncDatumRow(tree.NewDInt(4), tree.DNull, tree.DNull), + expectation: map[string]string{"result": "3.0", "a": "4", "b": "NULL", "c": "NULL"}, + }, + + // Sadly, this one requires Projection call since we're not using optimizer. + // Without optimizer, anything with bound variables doesn't get simplified. + //{ + // name: "div_by_zero_with_bound_vars", + // predicate: "SELECT sqrt(a::float) / sin(pi() - pi())", + // expectErr: "division by zera", + //}, + } { + t.Run(tc.name, func(t *testing.T) { + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, tc.predicate) + if tc.expectErr != "" { + require.Regexp(t, tc.expectErr, err) + return + } + + require.NoError(t, err) + if tc.verifyFold { + verifyConstantsFolded(e) + } + row := testRow + if tc.input != nil { + row = cdcevent.TestingMakeEventRow(desc, 0, tc.input, false) + } + + p, err := e.evalProjection(context.Background(), row, hlc.Timestamp{}, row) + require.NoError(t, err) + require.Equal(t, tc.expectation, slurpValues(t, p)) + }) + } +} + +// makeEvaluator creates Evaluator and configures it with specified +// select statement predicate. +func makeEvaluator(t *testing.T, st *cluster.Settings, selectStr string) Evaluator { + t.Helper() + evalCtx := eval.MakeTestingEvalContext(st) + e := NewEvaluator(&evalCtx) + if selectStr == "" { + return e + } + s, err := parser.ParseOne(selectStr) + require.NoError(t, err) + slct, ok := s.AST.(*tree.Select) + require.True(t, ok) + require.NoError(t, e.ConfigurePredicates(slct)) + return e +} + +func makeExprEval( + t *testing.T, st *cluster.Settings, ed *cdcevent.EventDescriptor, selectStr string, +) (*exprEval, error) { + t.Helper() + e := makeEvaluator(t, st, selectStr) + if err := e.initEval(context.Background(), ed); err != nil { + return nil, err + } + return e.evaluator, nil +} + +func decodeRowErr( + decoder cdcevent.Decoder, v *roachpb.RangeFeedValue, prev bool, +) (cdcevent.Row, error) { + kv := roachpb.KeyValue{Key: v.Key} + if prev { + kv.Value = v.PrevValue + } else { + kv.Value = v.Value + } + return decoder.DecodeKV(context.Background(), kv, v.Timestamp()) +} + +func decodeRow( + t *testing.T, decoder cdcevent.Decoder, v *roachpb.RangeFeedValue, prev bool, +) cdcevent.Row { + r, err := decodeRowErr(decoder, v, prev) + require.NoError(t, err) + return r +} + +func slurpKeys(t *testing.T, r cdcevent.Row) (keys []string) { + t.Helper() + require.NoError(t, r.ForEachKeyColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + keys = append(keys, tree.AsStringWithFlags(d, tree.FmtExport)) + return nil + })) + return keys +} + +func slurpValues(t *testing.T, r cdcevent.Row) map[string]string { + t.Helper() + res := make(map[string]string) + require.NoError(t, r.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + res[col.Name] = tree.AsStringWithFlags(d, tree.FmtExport) + return nil + })) + return res +} + +func randEncDatumRow( + t *testing.T, desc catalog.TableDescriptor, familyID descpb.FamilyID, +) (row rowenc.EncDatumRow) { + t.Helper() + rng, _ := randutil.NewTestRand() + + family, err := desc.FindFamilyByID(familyID) + require.NoError(t, err) + for _, colID := range family.ColumnIDs { + col, err := desc.FindColumnWithID(colID) + require.NoError(t, err) + row = append(row, rowenc.EncDatum{Datum: randgen.RandDatum(rng, col.GetType(), col.IsNullable())}) + } + return row +} + +func makeEncDatumRow(datums ...tree.Datum) (row rowenc.EncDatumRow) { + for _, d := range datums { + row = append(row, rowenc.EncDatum{Datum: d}) + } + return row +} diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go new file mode 100644 index 000000000000..c0822ad8f030 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -0,0 +1,183 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + jsonb "github.com/cockroachdb/cockroach/pkg/util/json" +) + +// In general, we want to only support functions that produce the same +// value given the same data -- i.e. immutable functions. +// However, we can provide reasonable overrides to a small set of stable +// functions that make sense in the context of CDC. +var supportedVolatileBuiltinFunctions = makeStringSet( + // These functions can be supported given that we set the statement + // and transaction timestamp to be equal to MVCC timestamp of the event. + // TODO(yevgeniy): We also define cdc specific functions, s.a. cdc_mvcc_timestamp + // Maybe delete cdc_ overrides; or.... maybe disallow these builtins in favor of cdc_ specific overrides? + "current_date", + "current_timestamp", + "localtimestamp", + "now", + "statement_timestamp", + "transaction_timestamp", +) + +// CDC Specific functions. +// TODO(yevgeniy): Finalize function naming: e.g. cdc.is_delete() vs cdc_is_delete() +var cdcFunctions = map[string]*tree.FunctionDefinition{ + "cdc_is_delete": makeCDCBuiltIn( + "cdc_is_delete", + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Bool), + Fn: func(evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { + rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) + if rowEvalCtx.updatedRow.IsDeleted() { + return tree.DBoolTrue, nil + } + return tree.DBoolFalse, nil + }, + Info: "Returns true if the event is a deletion", + Volatility: volatility.Stable, + }), + "cdc_mvcc_timestamp": cdcTimestampBuiltin( + "cdc_mvcc_timestamp", + "Returns event MVCC HLC timestamp", + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.mvccTS + }, + ), + "cdc_updated_timestamp": cdcTimestampBuiltin( + "cdc_updated_timestamp", + "Returns event updated HLC timestamp", + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.updatedRow.SchemaTS + }, + ), + "cdc_prev": makeCDCBuiltIn( + "cdc_prev", + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Jsonb), + Fn: prevRowAsJSON, + Info: "Returns previous value of a row as JSONB", + Volatility: volatility.Stable, + }), +} + +// TODO(yevgeniy): Implement additional functions (some ideas, not all should be implemented): +// * cdc_is_delete is easy; what about update? does update include new events? +// * cdc_is_new -- true if event is a new row +// * tuple overload (or cdc_prev_tuple) to return previous value as a tuple +// * cdc_key -- effectively key_in_value where key columns returned as either a tuple or a json. +// * cdc_key_cols -- return key column names; +// * this can come in handy when working with jsonb; for example, emit previous JSONB excluding +// key columns can be done with `SELECT cdc_prev() - cdc_key_cols() +// * cdc_event_family_is(fam): return true if cdc event family is specified family; overload both for +// family ID and family name. +// function can be used to write complex conditionals when dealing with multi-family table(s) + +var cdcFnProps = &tree.FunctionProperties{ + Category: "CDC builtin", +} + +func makeCDCBuiltIn(fnName string, overloads ...tree.Overload) *tree.FunctionDefinition { + return tree.NewFunctionDefinition(fnName, cdcFnProps, overloads) +} + +func cdcTimestampBuiltin( + fnName string, doc string, tsFn func(rowEvalCtx *rowEvalContext) hlc.Timestamp, +) *tree.FunctionDefinition { + return tree.NewFunctionDefinition( + fnName, + cdcFnProps, + []tree.Overload{ + { + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Decimal), + Fn: func(evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { + rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) + return eval.TimestampToDecimalDatum(tsFn(rowEvalCtx)), nil + }, + Info: doc, + Volatility: volatility.Stable, + }, + }, + ) +} + +func prevRowAsJSON(evalCtx *eval.Context, _ tree.Datums) (tree.Datum, error) { + rec := rowEvalContextFromEvalContext(evalCtx) + if rec.memo.prevJSON != nil { + return rec.memo.prevJSON, nil + } + + var prevJSON *tree.DJSON + if rec.prevRow.IsInitialized() { + b := jsonb.NewObjectBuilder(0) + if err := rec.prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + b.Add(col.Name, j) + return nil + }); err != nil { + return nil, err + } + prevJSON = tree.NewDJSON(b.Build()) + } else { + prevJSON = tree.NewDJSON(jsonb.NullJSONValue) + } + + rec.memo.prevJSON = prevJSON + evalCtx.Annotations.Set(cdcAnnotationAddr, rec) + return prevJSON, nil +} + +type cdcCustomFunctionResolver struct { + sessiondata.SearchPath +} + +// Resolve implements tree.CustomFunctionDefinitionResolver +func (cdcCustomFunctionResolver) Resolve(name string) *tree.FunctionDefinition { + fn, found := cdcFunctions[name] + if found { + return fn + } + fn, found = cdcFunctions[strings.ToLower(name)] + if found { + return fn + } + fn, found = tree.FunDefs[name] + if found { + return fn + } + return nil +} + +func makeStringSet(vals ...string) map[string]struct{} { + m := make(map[string]struct{}, len(vals)) + for _, v := range vals { + m[v] = struct{}{} + } + return m +} diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go new file mode 100644 index 000000000000..2eb31d7c74cd --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -0,0 +1,127 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + jsonb "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestEvaluatesCDCFunctionOverloads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, ""+ + "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, "+ + "FAMILY most (a,b,c), FAMILY only_d (d))") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + + ctx := context.Background() + + t.Run("current_timestamp", func(t *testing.T) { + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + "SELECT current_timestamp::int") + require.NoError(t, err) + futureTS := s.Clock().Now().Add(int64(60*time.Minute), 0) + p, err := e.evalProjection(ctx, testRow, futureTS, testRow) + require.NoError(t, err) + require.Equal(t, + map[string]string{"current_timestamp": strconv.FormatInt(futureTS.GoTime().Unix(), 10)}, + slurpValues(t, p)) + }) + + t.Run("cdc_is_delete", func(t *testing.T) { + for _, expectDelete := range []bool{true, false} { + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), expectDelete) + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + "SELECT cdc_is_delete()") + require.NoError(t, err) + + p, err := e.evalProjection(ctx, testRow, s.Clock().Now(), testRow) + require.NoError(t, err) + require.Equal(t, + map[string]string{"cdc_is_delete": fmt.Sprintf("%t", expectDelete)}, + slurpValues(t, p)) + } + }) + + t.Run("cdc_prev", func(t *testing.T) { + rowDatums := randEncDatumRow(t, desc, 0) + testRow := cdcevent.TestingMakeEventRow(desc, 0, rowDatums, false) + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + "SELECT cdc_prev()") + require.NoError(t, err) + + // When previous row is not set -- i.e. if running without diff, cdc_prev returns + // null json. + p, err := e.evalProjection(ctx, testRow, s.Clock().Now(), cdcevent.Row{}) + require.NoError(t, err) + require.Equal(t, map[string]string{"cdc_prev": jsonb.NullJSONValue.String()}, slurpValues(t, p)) + + // Otherwise, expect to get JSONB. + b := jsonb.NewObjectBuilder(len(rowDatums)) + for i, d := range rowDatums { + j, err := tree.AsJSON(d.Datum, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + require.NoError(t, err) + } + b.Add(desc.PublicColumns()[i].GetName(), j) + } + + expectedJSON := b.Build() + p, err = e.evalProjection(ctx, testRow, s.Clock().Now(), testRow) + require.NoError(t, err) + require.Equal(t, map[string]string{"cdc_prev": expectedJSON.String()}, slurpValues(t, p)) + }) + + for _, cast := range []string{"", "::decimal", "::string"} { + t.Run(fmt.Sprintf("cdc_{mvcc,updated}_timestamp()%s", cast), func(t *testing.T) { + schemaTS := s.Clock().Now().Add(int64(60*time.Minute), 0) + mvccTS := schemaTS.Add(int64(30*time.Minute), 0) + testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) + testRow.EventDescriptor.SchemaTS = schemaTS + + e, err := makeExprEval(t, s.ClusterSettings(), testRow.EventDescriptor, + fmt.Sprintf( + "SELECT cdc_mvcc_timestamp()%[1]s as mvcc, cdc_updated_timestamp()%[1]s as updated", cast, + )) + require.NoError(t, err) + + p, err := e.evalProjection(ctx, testRow, mvccTS, testRow) + require.NoError(t, err) + require.Equal(t, + map[string]string{ + "mvcc": eval.TimestampToDecimalDatum(mvccTS).String(), + "updated": eval.TimestampToDecimalDatum(schemaTS).String(), + }, + slurpValues(t, p)) + }) + } +} diff --git a/pkg/ccl/changefeedccl/cdceval/main_test.go b/pkg/ccl/changefeedccl/cdceval/main_test.go new file mode 100644 index 000000000000..05be3e3fa7e7 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 214f1f01b66d..2dce69cad815 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "doc.go", "event.go", + "projection.go", "rowfetcher_cache.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent", @@ -24,6 +25,7 @@ go_library( "//pkg/sql/row", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/hlc", @@ -38,27 +40,26 @@ go_test( srcs = [ "event_test.go", "main_test.go", + "projection_test.go", ], embed = [":cdcevent"], deps = [ "//pkg/base", + "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/utilccl", "//pkg/jobs/jobspb", - "//pkg/keys", - "//pkg/kv", - "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", - "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", - "//pkg/sql/catalog/desctestutils", "//pkg/sql/distsql", + "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index a8628bf54e72..1ca6bdae6d0d 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -38,6 +38,7 @@ type Metadata struct { FamilyID descpb.FamilyID // Column family ID. FamilyName string // Column family name. HasOtherFamilies bool // True if the table multiple families. + SchemaTS hlc.Timestamp // Schema timestamp for table descriptor. } // Decoder is an interface for decoding KVs into cdc event row. @@ -48,7 +49,7 @@ type Decoder interface { // Row holds a row corresponding to an event. type Row struct { - *eventDescriptor + *EventDescriptor // datums is the new value of a changed table row. datums rowenc.EncDatumRow @@ -84,7 +85,7 @@ func (r Row) ForEachKeyColumn() Iterator { // ForEachColumn returns Iterator for each column. func (r Row) ForEachColumn() Iterator { - return iter{r: r, cols: r.familyCols} + return iter{r: r, cols: r.valueCols} } // ForEachUDTColumn returns Datum iterator for each column containing user defined types. @@ -92,6 +93,19 @@ func (r Row) ForEachUDTColumn() Iterator { return iter{r: r, cols: r.udtCols} } +// DatumAt returns Datum at specified position. +func (r Row) DatumAt(at int) (tree.Datum, error) { + if at >= len(r.cols) { + return nil, errors.AssertionFailedf("column at %d out of bounds", at) + } + col := r.cols[at] + encDatum := r.datums[col.ord] + if err := encDatum.EnsureDecoded(col.Typ, r.alloc); err != nil { + return nil, errors.Wrapf(err, "error decoding column %q as type %s", col.Name, col.Typ.String()) + } + return encDatum.Datum, nil +} + // IsDeleted returns true if event corresponds to a deletion event. func (r Row) IsDeleted() bool { return r.deleted @@ -99,7 +113,7 @@ func (r Row) IsDeleted() bool { // IsInitialized returns true if event row is initialized. func (r Row) IsInitialized() bool { - return r.eventDescriptor != nil + return r.EventDescriptor != nil } // HasValues returns true if event row has values to decode. @@ -160,12 +174,9 @@ func (c ResultColumn) Ordinal() int { return c.ord } -// eventDescriptor implements Metadata interface, and associates -// table/family descriptor with the information needed to decode events. -type eventDescriptor struct { +// EventDescriptor is a cdc event descriptor: collection of information describing Row. +type EventDescriptor struct { Metadata - desc catalog.TableDescriptor - family *descpb.ColumnFamilyDescriptor // List of result columns produced by this descriptor. // This may be different from the table descriptors public columns @@ -173,15 +184,18 @@ type eventDescriptor struct { cols []ResultColumn // Precomputed index lists into cols. - keyCols []int // Primary key columns. - familyCols []int // All column family columns. - udtCols []int // Columns containing UDTs. + keyCols []int // Primary key columns. + valueCols []int // All column family columns. + udtCols []int // Columns containing UDTs. } func newEventDescriptor( - desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtualColumns bool, -) (*eventDescriptor, error) { - sd := eventDescriptor{ + desc catalog.TableDescriptor, + family *descpb.ColumnFamilyDescriptor, + includeVirtualColumns bool, + schemaTS hlc.Timestamp, +) (*EventDescriptor, error) { + sd := EventDescriptor{ Metadata: Metadata{ TableID: desc.GetID(), TableName: desc.GetName(), @@ -189,13 +203,12 @@ func newEventDescriptor( FamilyID: family.ID, FamilyName: family.Name, HasOtherFamilies: desc.NumFamilies() > 1, + SchemaTS: schemaTS, }, - family: family, - desc: desc, } // addColumn is a helper to add a column to this descriptor. - addColumn := func(col catalog.Column, ord int, colIdxSlice *[]int) { + addColumn := func(col catalog.Column, ord int) int { resultColumn := ResultColumn{ ResultColumn: colinfo.ResultColumn{ Name: col.GetName(), @@ -209,23 +222,26 @@ func newEventDescriptor( colIdx := len(sd.cols) sd.cols = append(sd.cols, resultColumn) - *colIdxSlice = append(*colIdxSlice, colIdx) if col.GetType().UserDefined() { sd.udtCols = append(sd.udtCols, colIdx) } + return colIdx } // Primary key columns must be added in the same order they // appear in the primary key index. primaryIdx := desc.GetPrimaryIndex() colOrd := catalog.ColumnIDToOrdinalMap(desc.PublicColumns()) + sd.keyCols = make([]int, primaryIdx.NumKeyColumns()) + var primaryKeyOrdinal catalog.TableColMap + for i := 0; i < primaryIdx.NumKeyColumns(); i++ { ord, ok := colOrd.Get(primaryIdx.GetKeyColumnID(i)) if !ok { return nil, errors.AssertionFailedf("expected to find column %d", ord) } - addColumn(desc.PublicColumns()[ord], ord, &sd.keyCols) + primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), i) } // Remaining columns go in same order as public columns. @@ -233,8 +249,17 @@ func newEventDescriptor( for ord, col := range desc.PublicColumns() { isInFamily := inFamily.Contains(col.GetID()) virtual := col.IsVirtual() && includeVirtualColumns - if isInFamily || virtual { - addColumn(col, ord, &sd.familyCols) + isValueCol := isInFamily || virtual + pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID()) + if isValueCol || isPKey { + colIdx := addColumn(col, ord) + if isValueCol { + sd.valueCols = append(sd.valueCols, colIdx) + } + + if isPKey { + sd.keyCols[pKeyOrd] = colIdx + } } } @@ -242,20 +267,31 @@ func newEventDescriptor( } // DebugString returns event descriptor debug information. -func (d *eventDescriptor) DebugString() string { - return fmt.Sprintf("eventDescriptor{table: %q(%d) family: %q(%d) pkCols=%v valCols=%v", - d.TableName, d.TableID, d.FamilyName, d.FamilyID, d.keyCols, d.familyCols) +func (d *EventDescriptor) DebugString() string { + return fmt.Sprintf("EventDescriptor{table: %q(%d) family: %q(%d) pkCols=%v valCols=%v", + d.TableName, d.TableID, d.FamilyName, d.FamilyID, d.keyCols, d.valueCols) } // SafeFormat implements SafeFormatter interface. -func (d *eventDescriptor) SafeFormat(p redact.SafePrinter, _ rune) { +func (d *EventDescriptor) SafeFormat(p redact.SafePrinter, _ rune) { p.Print(d.DebugString()) } +// ResultColumns returns all results columns in this descriptor. +func (d *EventDescriptor) ResultColumns() []ResultColumn { + return d.cols +} + +// Equals returns true if this descriptor equals other. +func (d *EventDescriptor) Equals(other *EventDescriptor) bool { + return other != nil && d.TableID == other.TableID && d.Version == other.Version && d.FamilyID == other.FamilyID +} + type eventDescriptorFactory func( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, -) (*eventDescriptor, error) + schemaTS hlc.Timestamp, +) (*EventDescriptor, error) type eventDecoder struct { // Cached allocations for *row.Fetcher @@ -281,12 +317,13 @@ func getEventDescriptorCached( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtual bool, + schemaTS hlc.Timestamp, cache *cache.UnorderedCache, -) (*eventDescriptor, error) { +) (*EventDescriptor, error) { idVer := idVersion{id: desc.GetID(), version: desc.GetVersion(), family: family.ID} if v, ok := cache.Get(idVer); ok { - ed := v.(*eventDescriptor) + ed := v.(*EventDescriptor) // Normally, this is a no-op since majority of changefeeds do not use UDTs. // However, in case we do, we must update cached UDT information based on this @@ -299,7 +336,7 @@ func getEventDescriptorCached( return ed, nil } - ed, err := newEventDescriptor(desc, family, includeVirtual) + ed, err := newEventDescriptor(desc, family, includeVirtual, schemaTS) if err != nil { return nil, err } @@ -317,7 +354,7 @@ func NewEventDecoder( cfg.LeaseManager.(*lease.Manager), cfg.CollectionFactory, cfg.DB, - details, + details.TargetSpecifications, ) includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull) @@ -325,8 +362,9 @@ func NewEventDecoder( getEventDescriptor := func( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, - ) (*eventDescriptor, error) { - return getEventDescriptorCached(desc, family, includeVirtual, eventDescriptorCache) + schemaTS hlc.Timestamp, + ) (*EventDescriptor, error) { + return getEventDescriptorCached(desc, family, includeVirtual, schemaTS, eventDescriptorCache) } return &eventDecoder{ @@ -354,13 +392,13 @@ func (d *eventDecoder) DecodeKV( return Row{}, err } - ed, err := d.getEventDescriptor(d.desc, d.family) + ed, err := d.getEventDescriptor(d.desc, d.family, schemaTS) if err != nil { return Row{}, err } return Row{ - eventDescriptor: ed, + EventDescriptor: ed, datums: datums, deleted: isDeleted, alloc: &d.alloc, @@ -441,22 +479,31 @@ func (it iter) Col(fn ColumnFn) error { // TestingMakeEventRow initializes Row with provided arguments. // Exposed for unit tests. func TestingMakeEventRow( - desc catalog.TableDescriptor, encRow rowenc.EncDatumRow, deleted bool, + desc catalog.TableDescriptor, familyID descpb.FamilyID, encRow rowenc.EncDatumRow, deleted bool, ) Row { - family, err := desc.FindFamilyByID(0) + family, err := desc.FindFamilyByID(familyID) if err != nil { panic(err) // primary column family always exists. } const includeVirtual = false - ed, err := newEventDescriptor(desc, family, includeVirtual) + ed, err := newEventDescriptor(desc, family, includeVirtual, hlc.Timestamp{}) if err != nil { panic(err) } var alloc tree.DatumAlloc return Row{ - eventDescriptor: ed, + EventDescriptor: ed, datums: encRow, deleted: deleted, alloc: &alloc, } } + +// TestingGetFamilyIDFromKey returns family ID encoded in the specified roachpb.Key. +// Exposed for testing. +func TestingGetFamilyIDFromKey( + decoder Decoder, key roachpb.Key, ts hlc.Timestamp, +) (descpb.FamilyID, error) { + _, familyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(context.Background(), key, ts) + return familyID, err +} diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 513d53f7bf24..7c4110da7cd1 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -12,20 +12,15 @@ import ( "context" "fmt" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -56,7 +51,7 @@ CREATE TABLE foo ( FAMILY only_c (c) )`) - tableDesc := getHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") mainFamily := mustGetFamily(t, tableDesc, 0) cFamily := mustGetFamily(t, tableDesc, 1) @@ -95,7 +90,7 @@ CREATE TABLE foo ( }, } { t.Run(fmt.Sprintf("%s/includeVirtual=%t", tc.family.Name, tc.includeVirtual), func(t *testing.T) { - ed, err := newEventDescriptor(tableDesc, tc.family, tc.includeVirtual) + ed, err := newEventDescriptor(tableDesc, tc.family, tc.includeVirtual, s.Clock().Now()) require.NoError(t, err) // Verify Metadata information for event descriptor. @@ -105,7 +100,7 @@ CREATE TABLE foo ( require.True(t, ed.HasOtherFamilies) // Verify primary key and family columns are as expected. - r := Row{eventDescriptor: ed} + r := Row{EventDescriptor: ed} require.Equal(t, tc.expectedKeyCols, slurpColumns(t, r.ForEachKeyColumn())) require.Equal(t, tc.expectedColumns, slurpColumns(t, r.ForEachColumn())) require.Equal(t, tc.expectedUDTCols, slurpColumns(t, r.ForEachUDTColumn())) @@ -134,37 +129,8 @@ CREATE TABLE foo ( FAMILY only_c (c) )`) - tableDesc := getHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") - - // We'll use rangefeed to pluck out updated rows from the table. - rf := s.ExecutorConfig().(sql.ExecutorConfig).RangeFeedFactory - ctx := context.Background() - rows := make(chan *roachpb.RangeFeedValue) - _, err := rf.RangeFeed(ctx, "foo-feed", - []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}, - s.Clock().Now(), - func(ctx context.Context, value *roachpb.RangeFeedValue) { - select { - case <-ctx.Done(): - case rows <- value: - } - }, - rangefeed.WithDiff(true), - ) - require.NoError(t, err) - - // Helper to read next rangefeed value. - popRow := func(t *testing.T) *roachpb.RangeFeedValue { - t.Helper() - select { - case r := <-rows: - log.Infof(ctx, "Got Row: %s", roachpb.PrettyPrintKey(nil, r.Key)) - return r - case <-time.After(5 * time.Second): - t.Fatal("timeout reading row") - return nil - } - } + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + popRow := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) type decodeExpectation struct { expectUnwatchedErr bool @@ -337,12 +303,13 @@ CREATE TABLE foo ( } serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() decoder := NewEventDecoder(ctx, &serverCfg, details) expectedEvents := len(tc.expectMainFamily) + len(tc.expectOnlyCFamily) for i := 0; i < expectedEvents; i++ { v := popRow(t) - _, eventFamilyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(ctx, v.Key, v.Timestamp()) + eventFamilyID, err := TestingGetFamilyIDFromKey(decoder, v.Key, v.Timestamp()) require.NoError(t, err) var expect decodeExpectation @@ -435,22 +402,3 @@ func slurpDatums(t *testing.T, it Iterator) (res []string) { })) return res } - -func getHydratedTableDescriptor( - t *testing.T, execCfgI interface{}, kvDB *kv.DB, tableName string, -) catalog.TableDescriptor { - t.Helper() - desc := desctestutils.TestingGetPublicTableDescriptor( - kvDB, keys.SystemSQLCodec, "defaultdb", tableName) - if !desc.ContainsUserDefinedTypes() { - return desc - } - - ctx := context.Background() - execCfg := execCfgI.(sql.ExecutorConfig) - collection := execCfg.CollectionFactory.NewCollection(ctx, nil) - var err error - desc, err = refreshUDT(context.Background(), desc.GetID(), kvDB, collection, execCfg.Clock.Now()) - require.NoError(t, err) - return desc -} diff --git a/pkg/ccl/changefeedccl/cdcevent/projection.go b/pkg/ccl/changefeedccl/cdcevent/projection.go new file mode 100644 index 000000000000..cd57e4f8ccc0 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdcevent/projection.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdcevent + +import ( + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// Projection is a helper to facilitate construction of "projection" rows. +// Projection is constructed given the underlying event descriptor. Only the key +// columns from the descriptor are initialized upon construction. All other +// value columns returned by projection need to be configured separated via AddValueColumn, +// and the value for that column must later be set via SetValueDatumAt. +// All columns added to this projection are in the ordinal order. +type Projection Row + +// MakeProjection returns Projection builder given underlying descriptor. +func MakeProjection(d *EventDescriptor) Projection { + p := Projection{ + EventDescriptor: &EventDescriptor{Metadata: d.Metadata}, + } + + // Add all primary key columns. + for _, colIdx := range d.keyCols { + col := d.cols[colIdx] + p.addColumn(col.Name, col.Typ, col.sqlString, &p.keyCols) + } + return p +} + +func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colIdxSlice *[]int) { + ord := len(p.cols) + p.cols = append(p.cols, ResultColumn{ + ResultColumn: colinfo.ResultColumn{ + Name: name, + Typ: typ, + }, + ord: ord, + sqlString: sqlString, + }) + + p.datums = append(p.datums, rowenc.EncDatum{}) + *colIdxSlice = append(*colIdxSlice, ord) + if typ.UserDefined() { + p.udtCols = append(p.udtCols, ord) + } +} + +// AddValueColumn adds a value column to this projection builder. +func (p *Projection) AddValueColumn(name string, typ *types.T) { + p.addColumn(name, typ, "", &p.valueCols) +} + +// SetValueDatumAt sets value datum at specified position. +func (p *Projection) SetValueDatumAt(pos int, d tree.Datum) error { + pos += len(p.keyCols) + if pos >= len(p.datums) { + return errors.AssertionFailedf("%d out of bounds", pos) + } + + // A bit of a sanity check -- types must match or d must be DNULL. + col := p.cols[pos] + if !(d == tree.DNull || col.Typ.Equal(d.ResolvedType())) { + return errors.AssertionFailedf("expected type %s for column %s@%d, found %s", + col.Typ, col.Name, pos, d.ResolvedType()) + } + + p.datums[pos].Datum = d + return nil +} + +// Project returns row projection. +func (p *Projection) Project(r Row) (Row, error) { + p.deleted = r.IsDeleted() + + // Copy key datums. + idx := 0 + if err := r.ForEachKeyColumn().Datum(func(d tree.Datum, col ResultColumn) error { + if idx >= len(p.keyCols) || idx >= len(p.datums) { + return errors.AssertionFailedf("%d out of bounds when projecting key column %s", idx, col.Name) + } + + p.datums[idx].Datum = d + idx++ + return nil + }); err != nil { + return Row{}, err + } + + return Row(*p), nil +} diff --git a/pkg/ccl/changefeedccl/cdcevent/projection_test.go b/pkg/ccl/changefeedccl/cdcevent/projection_test.go new file mode 100644 index 000000000000..c54c11c2617d --- /dev/null +++ b/pkg/ccl/changefeedccl/cdcevent/projection_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdcevent + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestProjection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + PRIMARY KEY (b, a) +)`) + + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + encDatums := makeEncDatumRow(tree.NewDInt(1), tree.NewDString("one"), tree.DNull) + + t.Run("row_was_deleted", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, true) + p := MakeProjection(input.EventDescriptor) + pr, err := p.Project(input) + require.NoError(t, err) + require.Equal(t, []string{"one", "1"}, slurpDatums(t, pr.ForEachKeyColumn())) + require.Equal(t, []string(nil), slurpDatums(t, pr.ForEachColumn())) + }) + + t.Run("identity", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, false) + p := MakeProjection(input.EventDescriptor) + idx := 0 + require.NoError(t, input.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error { + p.AddValueColumn(col.Name, col.Typ) + err := p.SetValueDatumAt(idx, d) + idx++ + return err + })) + + pr, err := p.Project(input) + require.NoError(t, err) + require.Equal(t, slurpDatums(t, input.ForEachKeyColumn()), slurpDatums(t, pr.ForEachKeyColumn())) + require.Equal(t, slurpDatums(t, input.ForEachColumn()), slurpDatums(t, pr.ForEachColumn())) + }) + + t.Run("must_be_correct_type", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, false) + p := MakeProjection(input.EventDescriptor) + p.AddValueColumn("wrong_type", types.Int) + require.Regexp(t, "expected type int", p.SetValueDatumAt(0, tree.NewDString("fail"))) + // But we allow NULL. + require.NoError(t, p.SetValueDatumAt(0, tree.DNull)) + }) + + t.Run("project_extra_column", func(t *testing.T) { + input := TestingMakeEventRow(desc, 0, encDatums, false) + p := MakeProjection(input.EventDescriptor) + idx := 0 + require.NoError(t, input.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error { + p.AddValueColumn(col.Name, col.Typ) + err := p.SetValueDatumAt(idx, d) + idx++ + return err + })) + p.AddValueColumn("test", types.Int) + require.NoError(t, p.SetValueDatumAt(idx, tree.NewDInt(5))) + + pr, err := p.Project(input) + require.NoError(t, err) + require.Equal(t, slurpDatums(t, input.ForEachKeyColumn()), slurpDatums(t, pr.ForEachKeyColumn())) + expectValues := slurpDatums(t, input.ForEachColumn()) + expectValues = append(expectValues, "5") + require.Equal(t, expectValues, slurpDatums(t, pr.ForEachColumn())) + }) +} + +func makeEncDatumRow(datums ...tree.Datum) (row rowenc.EncDatumRow) { + for _, d := range datums { + row = append(row, rowenc.EncDatum{Datum: d}) + } + return row +} diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index b4ad1e03a00e..8435ae987f44 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -79,9 +79,8 @@ func newRowFetcherCache( leaseMgr *lease.Manager, cf *descs.CollectionFactory, db *kv.DB, - details jobspb.ChangefeedDetails, + specs []jobspb.ChangefeedTargetSpecification, ) *rowFetcherCache { - specs := details.TargetSpecifications watchedFamilies := make(map[watchedFamily]struct{}, len(specs)) for _, s := range specs { watchedFamilies[watchedFamily{tableID: s.TableID, familyName: s.FamilyName}] = struct{}{} diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index c8507ca1b0e2..639ff679dcf7 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "mock_webhook_sink.go", "nemeses.go", + "row.go", "schema_registry.go", "testfeed.go", "tls_util.go", @@ -16,7 +17,14 @@ go_library( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/catalog", + "//pkg/sql/catalog/desctestutils", + "//pkg/sql/sem/tree", "//pkg/testutils/serverutils", "//pkg/util/fsm", "//pkg/util/hlc", @@ -26,6 +34,7 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_linkedin_goavro_v2//:goavro", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/cdctest/row.go b/pkg/ccl/changefeedccl/cdctest/row.go new file mode 100644 index 000000000000..8914c1f8ed13 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdctest/row.go @@ -0,0 +1,99 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdctest + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// MakeRangeFeedValueReader starts rangefeed on the specified table and returns a function +// that returns the next *roachpb.RangeFeedValue from the table. +// This funciton is intended to be used in tests that wish to read low level roachpb.KeyValue(s). +// Instead of trying to generate KVs ourselves (subject to encoding restrictions, etc), it is +// simpler to just "INSERT ..." into the table, and then use this function to read next value. +func MakeRangeFeedValueReader( + t *testing.T, execCfgI interface{}, desc catalog.TableDescriptor, +) func(t *testing.T) *roachpb.RangeFeedValue { + t.Helper() + execCfg := execCfgI.(sql.ExecutorConfig) + rows := make(chan *roachpb.RangeFeedValue) + _, err := execCfg.RangeFeedFactory.RangeFeed(context.Background(), "feed-"+desc.GetName(), + []roachpb.Span{desc.PrimaryIndexSpan(keys.SystemSQLCodec)}, + execCfg.Clock.Now(), + func(ctx context.Context, value *roachpb.RangeFeedValue) { + select { + case <-ctx.Done(): + case rows <- value: + } + }, + rangefeed.WithDiff(true), + ) + require.NoError(t, err) + + var timeout = 5 * time.Second + if util.RaceEnabled { + timeout = 3 * timeout + } + + // Helper to read next rangefeed value. + return func(t *testing.T) *roachpb.RangeFeedValue { + t.Helper() + select { + case r := <-rows: + log.Infof(context.Background(), "Got Row: %s", roachpb.PrettyPrintKey(nil, r.Key)) + return r + case <-time.After(timeout): + t.Fatal("timeout reading row") + return nil + } + } +} + +// GetHydratedTableDescriptor returns a table descriptor for the specified +// table. The descriptor is "hydrated" if it has user defined data types. +func GetHydratedTableDescriptor( + t *testing.T, execCfgI interface{}, kvDB *kv.DB, tableName string, +) catalog.TableDescriptor { + t.Helper() + desc := desctestutils.TestingGetPublicTableDescriptor( + kvDB, keys.SystemSQLCodec, "defaultdb", tableName) + if !desc.ContainsUserDefinedTypes() { + return desc + } + + ctx := context.Background() + execCfg := execCfgI.(sql.ExecutorConfig) + collection := execCfg.CollectionFactory.NewCollection(ctx, nil) + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + err := txn.SetFixedTimestamp(ctx, execCfg.Clock.Now()) + if err != nil { + return err + } + desc, err = collection.GetImmutableTableByID(ctx, txn, desc.GetID(), tree.ObjectLookupFlags{}) + return err + })) + // Immediately release the lease, since we only need it for the exact + // timestamp requested. + collection.ReleaseAll(ctx) + return desc +} diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index cbb9d4b84a43..bac4d2fdc329 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -224,8 +224,8 @@ func TestEncoders(t *testing.T) { } require.NoError(t, err) - rowInsert := cdcevent.TestingMakeEventRow(tableDesc, row, false) - prevRow := cdcevent.TestingMakeEventRow(tableDesc, nil, false) + rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) + prevRow := cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false) evCtx := eventContext{updated: ts} keyInsert, err := e.EncodeKey(context.Background(), rowInsert) @@ -235,8 +235,8 @@ func TestEncoders(t *testing.T) { require.NoError(t, err) require.Equal(t, expected.insert, rowStringFn(keyInsert, valueInsert)) - rowDelete := cdcevent.TestingMakeEventRow(tableDesc, row, true) - prevRow = cdcevent.TestingMakeEventRow(tableDesc, row, false) + rowDelete := cdcevent.TestingMakeEventRow(tableDesc, 0, row, true) + prevRow = cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) keyDelete, err := e.EncodeKey(context.Background(), rowDelete) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { e, err := getEncoder(opts, targets) require.NoError(t, err) - rowInsert := cdcevent.TestingMakeEventRow(tableDesc, row, false) + rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) var prevRow cdcevent.Row evCtx := eventContext{updated: ts} keyInsert, err := e.EncodeKey(context.Background(), rowInsert) @@ -374,8 +374,8 @@ func TestAvroEncoderWithTLS(t *testing.T) { require.NoError(t, err) require.Equal(t, expected.insert, rowStringFn(keyInsert, valueInsert)) - rowDelete := cdcevent.TestingMakeEventRow(tableDesc, row, true) - prevRow = cdcevent.TestingMakeEventRow(tableDesc, row, false) + rowDelete := cdcevent.TestingMakeEventRow(tableDesc, 0, row, true) + prevRow = cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) keyDelete, err := e.EncodeKey(context.Background(), rowDelete) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 1d6bc70f73cb..f67b6ea43d64 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -56,7 +56,6 @@ func newKVEventToRowConsumer( knobs TestingKnobs, topicNamer *TopicNamer, ) *kvEventToRowConsumer { - return &kvEventToRowConsumer{ frontier: frontier, encoder: encoder,