Skip to content

Commit

Permalink
changefeedcc: Predicates and projections in CDC.
Browse files Browse the repository at this point in the history
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed May 27, 2022
1 parent 739ceee commit 89eea0a
Show file tree
Hide file tree
Showing 19 changed files with 2,341 additions and 124 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ALL_TESTS = [
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl_test",
"//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test",
"//pkg/ccl/changefeedccl/cdceval:cdceval_test",
"//pkg/ccl/changefeedccl/cdcevent:cdcevent_test",
"//pkg/ccl/changefeedccl/cdctest:cdctest_test",
"//pkg/ccl/changefeedccl/cdcutils:cdcutils_test",
Expand Down
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func parseAvroSchema(t *testing.T, j string) (*avroDataRecord, error) {
}
return tableToAvroSchema(
cdcevent.TestingMakeEventRow(
tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), nil, false,
tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), 0, nil, false,
), "", string(changefeedbase.OptVirtualColumnsOmitted))
}

Expand Down Expand Up @@ -391,7 +391,7 @@ func TestAvroSchema(t *testing.T) {
fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.schema))
require.NoError(t, err)
origSchema, err := tableToAvroSchema(
cdcevent.TestingMakeEventRow(tableDesc, nil, false),
cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false),
avroSchemaNoSuffix, "")
require.NoError(t, err)
jsonSchema := origSchema.codec.Schema()
Expand All @@ -405,7 +405,7 @@ func TestAvroSchema(t *testing.T) {
require.NoError(t, err)

for _, encDatums := range rows {
row := cdcevent.TestingMakeEventRow(tableDesc, encDatums, false)
row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums, false)
evalCtx := &eval.Context{
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
}
Expand All @@ -430,15 +430,15 @@ func TestAvroSchema(t *testing.T) {
tableDesc, err := parseTableDesc(`CREATE TABLE "☃" (🍦 INT PRIMARY KEY)`)
require.NoError(t, err)
tableSchema, err := tableToAvroSchema(
cdcevent.TestingMakeEventRow(tableDesc, nil, false), avroSchemaNoSuffix, "")
cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), avroSchemaNoSuffix, "")
require.NoError(t, err)
require.Equal(t,
`{"type":"record","name":"_u2603_","fields":[`+
`{"type":["null","long"],"name":"_u0001f366_","default":null,`+
`"__crdb__":"🍦 INT8 NOT NULL"}]}`,
tableSchema.codec.Schema())
indexSchema, err := primaryIndexToAvroSchema(
cdcevent.TestingMakeEventRow(tableDesc, nil, false), tableDesc.GetName(), "")
cdcevent.TestingMakeEventRow(tableDesc, 0, nil, false), tableDesc.GetName(), "")
require.NoError(t, err)
require.Equal(t,
`{"type":"record","name":"_u2603_","fields":[`+
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestAvroSchema(t *testing.T) {
encDatums, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`)
require.NoError(t, err)

row := cdcevent.TestingMakeEventRow(tableDesc, encDatums[0], false)
row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums[0], false)
schema, err := tableToAvroSchema(
row, avroSchemaNoSuffix, "")
require.NoError(t, err)
Expand Down Expand Up @@ -719,7 +719,7 @@ func TestAvroSchema(t *testing.T) {
encDatums, err := parseValues(tableDesc, `VALUES (1, `+test.sql+`)`)
require.NoError(t, err)

row := cdcevent.TestingMakeEventRow(tableDesc, encDatums[0], false)
row := cdcevent.TestingMakeEventRow(tableDesc, 0, encDatums[0], false)
schema, err := tableToAvroSchema(row, avroSchemaNoSuffix, "")
require.NoError(t, err)
textual, err := schema.textualFromRow(row)
Expand Down Expand Up @@ -824,13 +824,13 @@ func TestAvroMigration(t *testing.T) {
fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.writerSchema))
require.NoError(t, err)
writerSchema, err := tableToAvroSchema(
cdcevent.TestingMakeEventRow(writerDesc, nil, false), avroSchemaNoSuffix, "")
cdcevent.TestingMakeEventRow(writerDesc, 0, nil, false), avroSchemaNoSuffix, "")
require.NoError(t, err)
readerDesc, err := parseTableDesc(
fmt.Sprintf(`CREATE TABLE "%s" %s`, test.name, test.readerSchema))
require.NoError(t, err)
readerSchema, err := tableToAvroSchema(
cdcevent.TestingMakeEventRow(readerDesc, nil, false), avroSchemaNoSuffix, "")
cdcevent.TestingMakeEventRow(readerDesc, 0, nil, false), avroSchemaNoSuffix, "")
require.NoError(t, err)

writerRows, err := parseValues(writerDesc, `VALUES `+test.writerValues)
Expand All @@ -839,7 +839,7 @@ func TestAvroMigration(t *testing.T) {
require.NoError(t, err)

for i := range writerRows {
writerEvent := cdcevent.TestingMakeEventRow(writerDesc, writerRows[i], false)
writerEvent := cdcevent.TestingMakeEventRow(writerDesc, 0, writerRows[i], false)
encoded, err := writerSchema.BinaryFromRow(nil, writerEvent.ForEachColumn())
require.NoError(t, err)
row, err := rowFromBinaryEvolved(encoded, writerSchema, readerSchema)
Expand Down Expand Up @@ -907,7 +907,7 @@ func benchmarkEncodeType(b *testing.B, typ *types.T, encRow rowenc.EncDatumRow)
tableDesc, err := parseTableDesc(
fmt.Sprintf(`CREATE TABLE bench_table (bench_field %s)`, typ.SQLString()))
require.NoError(b, err)
row := cdcevent.TestingMakeEventRow(tableDesc, encRow, false)
row := cdcevent.TestingMakeEventRow(tableDesc, 0, encRow, false)
schema, err := tableToAvroSchema(row, "suffix", "namespace")
require.NoError(b, err)

Expand Down
71 changes: 71 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cdceval",
srcs = [
"doc.go",
"expr_eval.go",
"functions.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/schemaexpr",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/normalize",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/types",
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "cdceval_test",
srcs = [
"expr_eval_test.go",
"functions_test.go",
"main_test.go",
],
embed = [":cdceval"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/utilccl",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/distsql",
"//pkg/sql/parser",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)
107 changes: 107 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdceval

/***
cdceval package is a library for evaluating various expressions in CDC.
Namely, this package concerns itself with 3 things:
* Filter evaluation -- aka predicates: does the event match boolean expression.
* Projection evaluation: given the set of projection expressions, evaluate them.
* (Soon to be added) Evaluation of computed virtual columns.
Evaluator is the gateway into the evaluation logic; it has 3 methods matching
the above use cases. Before filtering and projection can be used, Evaluator must
be configured with appropriate predicate and filtering expressions via ConfigureProjection.
If the Evaluator is not configured with ConfigureProjection, then each event is assumed
to match filter by default, and projection operation is an identity operation returning input
row.
Evaluator constructs a helper structure (exprEval) to perform actual evaluation.
One exprEval exists per cdcevent.EventDescriptor (currently, a new exprEval created
whenever event descriptor changes; we might have to add some sort of caching if needed).
Evaluation of projections and filter expressions are identical.
First, we have "compilation" phase:
1. Expression is "walked" to resolve the names and replace those names with tree.IndexedVar expressions.
The "index" part of the indexed var refers to the "index" of the datum in the row.
(note however: the row is abstracted under cdcevent package). IndexedVar allows the value of that
variable to be bound later once it is known; it also associates the type information
with that variable.
2. Expression is typed check to ensure that it is of the appropriate type.
* Projection expressions can be of tree.Any type, while filters must be tree.DBool.
3. Expression is then normalized (constants folded, etc).
It is an error to have a filter expression which evaluates to "false" -- in this case, Evaluator
will return a "contradiction" error.
After expressions "compiled", they can be evaluated; and again, both projections and filters use the same
logic (evalExpr() function); basically, all IndexedVars are bound to the Datums in the updated row, and the
expression is evaluated to the appropriate target type.
Expressions can contain functions. We restrict the set of functions that can be used by CDC.
Volatile functions, window functions, aggregate functions are disallowed.
Certain stable functions (s.a. now(), current_timestamp(), etc) are allowed -- they will always
return the MVCC timestamp of the event.
We also provide custom, CDC specific functions, such as cdc_prev() which returns prevoius row as
a JSONB record. See functions.go for more details.
***/

// TODO(yevgeniy): Various notes/questions/issues and todos.
// 1. Options issues:
// * key_in_value: makes no sense; just "select *"
// * key_only: currently unsupported by this flavor; would be nice to support it though
// i.e. you only want the key, but you need "where" clause filtering. Not clear how to express in sql.y
// * VirtualColumnVisibility: null or omit -- both can be accomplished
// * null: currently emitting null, but you can choose to emit null via "select ... null as vcolumn"
// * omit: well, just don't select.
// * On the other hand, we can also support "emit" mode, where we can compute vcolumn expression.
// * updated and mvcc_timestamp options -- both supported via select
// * Wrapped option -- does it make sense here.
// 3. Probably need to add more custom functions.
// * Determine what to do with stable function overrides (now()) vs cdc_mvcc_timestamp. Keep both? drop one?
// 4. How to surface previous row -- it's an open question.
// * Option 1: provide cdc_prev() builtin which returns JSON encoding of previous row.
// One of the negatives is that we are adding an additional JSONB encoding cost, though, this may not
// be that horrible. One interesting thing we could do with this approach is to also have a function
// cdc_delta which reduces JSONB to contain only modified columns (cdc_delta(cdc_prev()).
// Of course you could do something like this with "prev" table, but you'd have to "(case ...)" select
// for each table column.
// And since composition is so cool, you could use cdc_delta to determine if an update is not actually
// and update, but an upsert event.
// * Option 2: provide "prev" table. Also has negatives. Name resolution becomes more complex. You could
// legitimately have "prev" table, so you'd always need to alias the "real prev" table. The prev table
// is not specified via sql.y, so that's confusing.
// * Regardless of the option, keep in mind that sometimes prev is not set -- e.g. w/out diff option
// (here, we can return an error), but also not set during initial scan. So, the query must handle
// nulls in prev value. Just something to keep in mind.
// 5. We must be able to return permanent errors from this code that cause changefeed to fail.
// If filtering fails for a row (e.g. "select ... where col_a/col_b > 0" results in divide by 0),
// this will fail forever, and so we must be able to return permanent error.
// 6. Related to 5, we must have poison message handling so we won't kill feed in cases like that.
// 7. Schema changes could cause permanent failures.
// 8. Multiple *'s are allowed. But should they?
// 9. It is interesting to consider what access to prev does when we then send that data to encoder.
// Right now, we hard code before/after datums; with predicates, we should probably change how things are encoded.
// I.e. no "before"/"after" fields in json/avro -- instead, you select what you want to select.
// 10. Multi family support -- sort of breaks down because you get datums only for 1 family at a time. Any expressions
// comparing columns across families will fail.
// 11. Span constraints -- arguably the "holy grail" -- something that depends on the optiizer, but perhaps we
// can find a way of using that w/out significant refactor to expose entirety of changefeed to opt.
// Basically, given the set of predicates over primary key span, try to narrow the span(s) to those that can
// satisfy predicates.
// 12. UI/Usability: Simple contradictions are detected -- but not all. Even w/out contradiction, the user
// may want to know which events match/not match, and how does the data look like. We might need a mode
// where the data always emitted, but it is marked somehow, indicating how the data will be handled.
// 13. We should walk expressions to determine if we need to turn on an option. E.g. if we know user wants to filter
// out deletes, we could push this predicate down to KV (once kv supports filtering).
// Another idea is potentially detect if cdc_prev() is used and if so, turn on with diff option.
Loading

0 comments on commit 89eea0a

Please sign in to comment.