Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: SQL evaluation for CDC #81676

Merged
merged 3 commits into from
May 31, 2022
Merged

Conversation

miretskiy
Copy link
Contributor

@miretskiy miretskiy commented May 23, 2022

Introduce cdceval package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals. However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new WITH option (as done in
#80499), this library
aims to provide support for general expression evaluation.

cdceval library provides the following functionality:

  • Ability to evaluate predicates (filters) so that events may be
    filtered.
  • Ability to evaluate projection expressions (select *, select a, b,c, or even select a + b - c as math_column)
  • Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

cdceval library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case. CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table. In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:

  • Volatile functions -- not supported
  • Stable functions, such as now(), current_timestamp(), etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  • Multi row functions (aggregates, windowing functions) are
    disallowed.

cdceval also defined few custom, CDC specific functions, such as:

  • cdc_prev(): Returns the previous row values as a JSONB object.
  • cdc_is_delete(): Returns true if the row was deleted.
  • Others -- see functions.go

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None

@miretskiy miretskiy requested a review from a team as a code owner May 23, 2022 19:30
@miretskiy miretskiy requested a review from a team May 23, 2022 19:30
@miretskiy miretskiy requested a review from a team as a code owner May 23, 2022 19:30
@miretskiy miretskiy requested review from gh-casper and removed request for a team May 23, 2022 19:30
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@miretskiy miretskiy requested review from ajwerner and HonoreDB May 23, 2022 19:30
@miretskiy miretskiy force-pushed the cdceval branch 2 times, most recently from 0111f6e to 8075bd0 Compare May 24, 2022 02:10
@jordanlewis
Copy link
Member

Wow, this is really neat! Seems like it will open a whole new world of possibility.

It's probably early days still but have you thought about using the vectorized execution engine for scalar expression evaluation rather than expr.Eval? It's much more efficient with respect to garbage production and CPU usage, especially for cases like this where you'll be running a pre-decided set of projections and filters against many rows. If you already have "batches" to process it might be a very natural fit. Sorry for the drive by comment, but I couldn't resist.

@miretskiy
Copy link
Contributor Author

Wow, this is really neat! Seems like it will open a whole new world of possibility.

Yes, I'm very excited about the potential here.

It's probably early days still but have you thought about using the vectorized execution engine for scalar expression evaluation rather than expr.Eval? It's much more efficient with respect to garbage production and CPU usage, especially for cases like this where you'll be running a pre-decided set of projections and filters against many rows. If you already have "batches" to process it might be a very natural fit. Sorry for the drive by comment, but I couldn't resist.

It is early days, you're right. Unfortunately, we are not processing batches -- just 1 row at a time, so I'm not sure vectorization would help. However, one could imagine that at some point we might want to support "live query" type system where we could do joins against other tables as a result of an event. In such cases, I would imagine batching & vectorization play important role.

Also, #80499 was the first foray for changefeeds into optimizer land.
I think I would to try adopting similar, narrow approach to projection evaluation -- basically, try to get some optimizer optimization goodies, just like we already get them for predicates.

@miretskiy miretskiy force-pushed the cdceval branch 2 times, most recently from 794e33e to bd9f734 Compare May 24, 2022 12:00
Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r1, 5 of 5 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, and @miretskiy)


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 268 at r6 (raw file):

		return false, err
	}
	return d == tree.DBoolTrue, nil

Something to consider here is whether we want NULL to count as true or false in a predicate. In check constraints, NULL counts as true.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 569 at r6 (raw file):

	// Aggregates, generators and window functions are not supported.
	switch fnClass {
	case tree.AggregateClass, tree.GeneratorClass, tree.WindowClass:

It's occasionally been frustrating that you can't use a generator function inside an aggregator function in contexts that expect a scalar function--for example, array_agg(json_array_elements(jsonCol)) to convert a json array into a SQL array. Do you know if it'd break anything here to allow that kind of expression? (Definitely not a blocker, regardless).


pkg/ccl/changefeedccl/cdceval/functions_test.go line 83 at r6 (raw file):

		// When previous row is not set -- i.e. if running without diff, cdc_prev returns
		// null json.

Maybe we should instead walk the expression on startup looking for cdc_prev and set WithDiff to true if we find it, rather than making the user tell us they're interested in previous values in two places.

}

if star, isStar := expr.(tree.UnqualifiedStar); isStar {
// Can't type check star -- we'll handle it later during eval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use types.AnyTuple here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, I'm not sure. I see that this stuff is used when generating row(); i'm not sure how this would help here though.

  • expansion still happens, and I happen to know exactly what types those expressions are... Do you have any code gist where you think AnyTuple might come in handy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it...probably no point, especially with no aggregate functions.

@miretskiy
Copy link
Contributor Author

Something to consider here is whether we want NULL to count as true or false in a predicate. In check constraints, NULL counts as true.

I'd rather not do that. I think if you want null to match you can do (or .. IS NULL) clause.

@miretskiy
Copy link
Contributor Author

Maybe we should instead walk the expression on startup looking for cdc_prev and set WithDiff to true if we find it, rather than making the user tell us they're interested in previous values in two places.

This is definitely something I have considered. I think we would be able to detect if, for example, the user wants to filter out deleted rows; and if so automatically push predicate down to KV.
With diff is trivial, but at the same time we should think whether we want to do this automatically; after all automatically turning on diff option has performance implication (though probably substantially less now).
Regardless, at this point, I think i'm going to punt on this optimization. I have added another item to the list of ideas in the doc.go to make sure we don't miss this.

@miretskiy
Copy link
Contributor Author

It's occasionally been frustrating that you can't use a generator function inside an aggregator function in contexts that expect a scalar function--for example, array_agg(json_array_elements(jsonCol)) to convert a json array into a SQL array. Do you know if it'd break anything here to allow that kind of expression? (Definitely not a blocker, regardless).

Hmm...My thinking was that anything that operates in a context of a table or a set of rows should be disabled.
Does it have to be this way? Of course not. We could for example implement count(*) if we wanted to (to return 1. But I'm not sure that makes much sense. I'm sure we could enable some uses... but, I just wonder if these make sense in the context of a single row event.

@HonoreDB
Copy link
Contributor

HonoreDB commented May 24, 2022

I just wonder if [aggregate functions] make sense in the context of a single row event.

I'm almost exclusively thinking about unnest and json_array_elements, which let you turn a single datum into a set you can act on with aggregate functions. Without access to set-returning and aggregate functions you tend to be out of luck if we haven't implemented the specific array function you're looking for. Like there's no way to delete all instances of a given json object from a json array in an update, or to create a check constraint that all elements of an array are unique. It's why I've been messing with our SQL builtins recently.

@miretskiy
Copy link
Contributor Author

I'm almost exclusively thinking about unnest and json_array_elements, which let you turn a single datum into a set you can act on with aggregate functions. Without access to set-returning and aggregate functions you tend to be out of luck if we haven't implemented the specific array function you're looking for. Like there's no way to delete all instances of a given json object from a json array in an update, or to create a check constraint that all elements of an array are unique. It's why I've been messing with our SQL builtins recently.

I understand. I think that if you succeed in your "messing" with SQL builtins, then I suppose we can use that.
Even if we decide to do cdc specific override, we could do that. I think I would prefer to do that in a separate pr to enable aggregate functions in expressions.

@miretskiy miretskiy changed the title changefeedcc: Predicates and projections in CDC. changefeedccl: Predicates and projections in CDC. May 24, 2022
@miretskiy miretskiy requested a review from HonoreDB May 25, 2022 21:46
@miretskiy
Copy link
Contributor Author

Added restriction to disallow subselects. Just too big of a can of warms, and not clear if these are useful in a single row context. /cc @HonoreDB

@miretskiy miretskiy force-pushed the cdceval branch 2 times, most recently from 01ef553 to 4b96c63 Compare May 25, 2022 22:11
@miretskiy miretskiy force-pushed the cdceval branch 4 times, most recently from 9bab24d to 89eea0a Compare May 27, 2022 14:24
Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 12 of 19 files at r3, 1 of 2 files at r7, 1 of 4 files at r8, 2 of 3 files at r9, 3 of 3 files at r10, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @HonoreDB, and @miretskiy)


pkg/ccl/changefeedccl/cdceval/expr_eval_test.go line 40 at r10 (raw file):

	"github.com/stretchr/testify/require"
)

Few more fun tests:
SELECT IF(cdc_is_delete(),'deleted',a::string) (I'm guessing this works)
SELECT btrim(a) (When does the unknown type signature error happen?)
SELECT 1/(10-a) (Do we fail gracefully when there's a runtime error evaluating SQL after some results are already returned?)
SELECT parse_timetz('1:00-0') (Ideally this succeeds since with a timezone specified it's not dependent on locale, but the implementation involves GetTxnTimestamp so I don't know if it will)

@miretskiy
Copy link
Contributor Author

Few more fun tests:
SELECT IF(cdc_is_delete(),'deleted',a::string) (I'm guessing this works)

Added

SELECT btrim(a) (When does the unknown type signature error happen?)
SELECT parse_timetz('1:00-0') (Ideally this succeeds since with a timezone specified it's not dependent on locale, but the implementation involves GetTxnTimestamp so I don't know if it will)

Added both btrim + time parse test; as well as btrim(a).
Currently, error happens during projection because when you construct "evaluator", you don't have
a descriptor yet; I guess it's something that can be added later.

SELECT 1/(10-a) (Do we fail gracefully when there's a runtime error evaluating SQL after some results are already returned?)

I have divide by 0 error tests. There is really no such thing as some results returned.
It's an evaluation 1 row at a time. What we need to have is a way to mark any evaluation error as a permanent error.
Why? Because (hopefully) we did our job and evaluation is "pure" -- only depends on row state -- thus, once you get
an error, you will always get an error. So, must fail changefeed. Poison pill handling will help with this problem.

Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 3 files at r11.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @HonoreDB, and @miretskiy)

@miretskiy
Copy link
Contributor Author

bors r=honoredb

@knz
Copy link
Contributor

knz commented May 31, 2022

Because (hopefully) we did our job and evaluation is "pure" -- only depends on row state

We have non-deterministic functions (e.g. random), so no

@craig
Copy link
Contributor

craig bot commented May 31, 2022

Build failed (retrying...):

@miretskiy
Copy link
Contributor Author

We disallow those functions, @knz

Yevgeniy Miretskiy added 3 commits May 31, 2022 14:20
Expose function to setup name resolution visitor.

Release Notes: None
Introduce a `CustomFunctionDefinitionResolver` interface which allows
`tree.SearchPath` implementations to optionally implement custom
function defition resolution logic.

Release Notes: None
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
@craig
Copy link
Contributor

craig bot commented May 31, 2022

Canceled.

@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented May 31, 2022

Build succeeded:

@craig craig bot merged commit 01a68a3 into cockroachdb:master May 31, 2022
@shermanCRL shermanCRL changed the title changefeedccl: Predicates and projections in CDC. changefeedccl: SQL evaluation for CDC Jun 17, 2022
craig bot pushed a commit that referenced this pull request Jun 26, 2022
82562: changefeeccl: Projections and Filters in CDC. r=miretskiy a=miretskiy

Add a variant of CHANGEFEED statement that allows specification
of predicates and projections.

```
CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT .... FROM t WHERE ...
```

This changefeed variant can target at most 1 table (and 1 column
family) at a time. The expressions used as the projections and
filters can be almost any supported expression with some restrictions:
  * Volatile functions not allowed.
  * Sub-selects not allowed.
  * Aggregate and window functions (i.e. functions operating over many
    rows) not allowed.
  * Some stable functions, notably functions which return MVCC
    timestamp, are overridden to return MVCC timestamp of the event.

In addition, some CDC specific functions are provided:
  * cdc_is_delete: returns true if the event is a deletion event.
  * cdc_prev: returns JSON representation of the previous row state.
  * cdc_updated_timestamp: returns event update timestamp (usually MVCC
    timestamp, but can be different if e.g. undergoing schema changes)
Additional CDC specific functions will be added in the follow on PRs.

Few examples:

* Emit all but the deletion events:
```
CREATE CHANGEFEED INTO 'kafka://'
AS SELECT * FROM table
WHERE NOT cdc_is_delete()
```

* Emit all events that modified `important_col` column:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *, cdc_prev() AS previous
FROM important_table
WHERE important_col != cdc_prev()->'important_col'
```

* Emit few colums, as well as computed expresions:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable
FROM warehouse
WHERE region='US/east';
```

When filter expression is specified, changefeed will now consult
optimizer so that the set of spans scanned by changefeed can be
restricted based on the predicate.

For example, given the following table and a changefeed:
```
CREATE TABLE warehouse (
  region STRING,
  warehouseID int,
  ....
  PRIMARY KEY (region, warehouseID)
);

CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *
FROM warehouse
WHERE region='US/east';
```

The create changefeed will only scan table spans that contain `US/east`
region (and ignore all other table spans).

---

For foundational work, see:

- #81676
- #81249
- #80499

Addresses:
- #56949
- #31214


---

Release Notes (enterprise):
CHANGEFEED statement now supports general expressions -- predicates and projections.
Projections allow customers to emit only the data that they care about,
including computed columns, while predicates (i.e. filters) allow them
to restrict the data that's emitted only to those events that match the
filter.

```
CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete()
```


Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants