Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
82473: outliers: latency quantile detector r=matthewtodd a=matthewtodd

Closes #79451.

Note that this detector comes disabled by default. To enable, use the
`sql.stats.outliers.experimental.latency_quantile_detection.enabled`
cluster setting.

This new detection "algorithm" is a simple, perhaps naïve, heuristic,
though it may be good enough to be useful: we consider a statement an
outlier if its execution latency is > p99 latency for its fingerprint,
so long as it's also greater than twice the median latency and greater
than an arbitrary user-defined threshold.[^1]

We expect the operation of this detector to be memory constrained. To
avoid OOM errors, we offer a cluster setting[^2] to cap the overall
memory used, along with metrics reporting the number of fingerprints
being tracked, memory usage, and "evictions," or fingerprint histories
being dropped due to memory pressure. If a high rate of evictions is
observed, it will be necessary to raise either one or both of the memory
limit and the "interesting" threshold settings to regain a steady state.

Note also that this detector is not designed to be used concurrently by
multiple goroutines. Correctness currently relies on the global lock in
`outliers.Registry`, to be addressed next in #81021.

[^1]: This threshold is defined by a cluster setting,
`sql.stats.outliers.experimental.latency_quantile_detection.interesting_threshold`.
Without such a threshold, we would otherwise flag a statement that, say,
normally executes in 3ms taking 7ms; but 7ms is probably still fast
enough that there's nothing to be done.

[^2]: `sql.stats.outliers.experimental.latency_quantile_detection.memory_limit`

Release note: None

82741: obsservice: ui handler serves db console assets  r=sjbarag,abarganier a=dhartunian

This change enables the Observabilty Service to serve the DB Console UI
itself. Endpoints that CRDB can handle are proxied (`/_admin/`,
`/_status/`, `/ts/`) but the base URL will return a blank HTML page with
JS assets that load the DB Console.

In order to build with the ui code, the `--config=with_ui` flag must be
passed to bazel.

This commit also adds a shortcut to the `dev` command to build
`obsservice` which includes the `--config=with_ui` flag just as we do by
default in `cockroach` builds.

Release note: None

83717: changefeedccl: infer WITH diff from changefeed expression r=[miretskiy] a=HonoreDB

Informs #83322. Specifically this does the "infer whether
or not to set diff" part of it.

This went through a more ambitious phase
(https://github.com/cockroachdb/cockroach/compare/master...HonoreDB:cockroach:cdc_expressions_infer_diff?expand=1)
where it also handled user defined types, but I ended up
agreeing with wiser heads that this was adding too much complexity
for no known use case.

I also came really close to setting envelope=row as the
default, but envelope=row is not supported in all sinks
so again, it felt too messy to do without a little further
conversation.

Release note (sql change): CREATE CHANGEFEED AS statements no longer need to include WITH DIFF when using cdc_prev().

83772: storage: remove experimental markers for MVCC range tombstones r=aliher1911 a=erikgrinaker

GC is still under development, but I think it's fine to remove the marker before GC works to let people start building with it.

---

This removes the experimental markers for MVCC range tombstones, by
renaming functions, methods, and parameters to not include
`experimental`, as well as warning comments.

The changes are entirely mechanical.

Release note: None

83784: scexec: refactor schema changer job update r=postamar a=postamar

This PR rebases #83724 on top of a few commits which refactor the schema changer job update machinery.


Co-authored-by: Matthew Todd <todd@cockroachlabs.com>
Co-authored-by: David Hartunian <davidh@cockroachlabs.com>
Co-authored-by: Aaron Zinger <zinger@cockroachlabs.com>
Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
Co-authored-by: Marius Posta <marius@cockroachlabs.com>
  • Loading branch information
6 people committed Jul 5, 2022
6 parents f6d0d84 + 68aa3a4 + b8faa51 + 96f63a7 + 1fb3c64 + 94af1f5 commit 97f8422
Show file tree
Hide file tree
Showing 78 changed files with 1,571 additions and 468 deletions.
100 changes: 83 additions & 17 deletions pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,23 @@ func NormalizeAndValidateSelectForTarget(
target jobspb.ChangefeedTargetSpecification,
sc *tree.SelectClause,
includeVirtual bool,
) error {
) (n NormalizedSelectClause, _ error) {
execCtx.SemaCtx()
execCfg := execCtx.ExecCfg()
if !execCfg.Settings.Version.IsActive(ctx, clusterversion.EnablePredicateProjectionChangefeed) {
return errors.Newf(
return n, errors.Newf(
`filters and projections not supported until upgrade to version %s or higher is finalized`,
clusterversion.EnablePredicateProjectionChangefeed.String())
}

// This really shouldn't happen as it's enforced by sql.y.
if len(sc.From.Tables) != 1 {
return pgerror.Newf(pgcode.Syntax, "invalid CDC expression: only 1 table supported")
return n, pgerror.Newf(pgcode.Syntax, "invalid CDC expression: only 1 table supported")
}

// Sanity check target and descriptor refer to the same table.
if target.TableID != desc.GetID() {
return errors.AssertionFailedf("target table id (%d) does not match descriptor id (%d)",
return n, errors.AssertionFailedf("target table id (%d) does not match descriptor id (%d)",
target.TableID, desc.GetID())
}

Expand All @@ -66,19 +66,19 @@ func NormalizeAndValidateSelectForTarget(
// associated Txn() -- without which we cannot perform normalization. Verify
// this assumption (txn is needed for type resolution).
if execCtx.Txn() == nil {
return errors.AssertionFailedf("expected non-nil transaction")
return n, errors.AssertionFailedf("expected non-nil transaction")
}

// Perform normalization.
var err error
sc, err = normalizeSelectClause(ctx, *execCtx.SemaCtx(), sc, desc)
normalized, err := normalizeSelectClause(ctx, *execCtx.SemaCtx(), sc, desc)
if err != nil {
return err
return n, err
}

ed, err := newEventDescriptorForTarget(desc, target, schemaTS(execCtx), includeVirtual)
if err != nil {
return err
return n, err
}

evalCtx := &execCtx.ExtendedEvalContext().Context
Expand All @@ -88,17 +88,17 @@ func NormalizeAndValidateSelectForTarget(
if _, _, err := constrainSpansBySelectClause(
ctx, execCtx, evalCtx, execCfg.Codec, sc, ed,
); err != nil {
return err
return n, err
}

// Construct and initialize evaluator. This performs some static checks,
// and (importantly) type checks expressions.
evaluator, err := NewEvaluator(evalCtx, sc)
if err != nil {
return err
return n, err
}

return evaluator.initEval(ctx, ed)
return normalized, evaluator.initEval(ctx, ed)
}

func newEventDescriptorForTarget(
Expand Down Expand Up @@ -143,14 +143,27 @@ func getTargetFamilyDescriptor(
}
}

// NormalizedSelectClause is a select clause returned by normalizeSelectClause.
// normalizeSelectClause also modifies the select clause in place, but this
// marker type is needed so that we can ensure functions that rely on
// normalized input aren't called out of order.
type NormalizedSelectClause tree.SelectClause

// Clause returns a pointer to the underlying SelectClause (still in normalized
// form).
func (n NormalizedSelectClause) Clause() *tree.SelectClause {
sc := tree.SelectClause(n)
return &sc
}

// normalizeSelectClause performs normalization step for select clause.
// Returns normalized select clause.
func normalizeSelectClause(
ctx context.Context,
semaCtx tree.SemaContext,
sc *tree.SelectClause,
desc catalog.TableDescriptor,
) (normalizedSelectClause *tree.SelectClause, _ error) {
) (normalizedSelectClause NormalizedSelectClause, _ error) {
// Turn FROM clause to table reference.
// Note: must specify AliasClause for TableRef expression; otherwise we
// won't be able to deserialize string representation (grammar requires
Expand All @@ -162,7 +175,7 @@ func normalizeSelectClause(
case tree.TablePattern:
default:
// This is verified by sql.y -- but be safe.
return nil, errors.AssertionFailedf("unexpected table expression type %T",
return normalizedSelectClause, errors.AssertionFailedf("unexpected table expression type %T",
sc.From.Tables[0])
}

Expand Down Expand Up @@ -227,14 +240,14 @@ func normalizeSelectClause(
})

if err != nil {
return nil, err
return normalizedSelectClause, err
}
switch t := stmt.(type) {
case *tree.SelectClause:
normalizedSelectClause = t
normalizedSelectClause = NormalizedSelectClause(*t)
default:
// We walked tree.SelectClause -- getting anything else would be surprising.
return nil, errors.AssertionFailedf("unexpected result type %T", stmt)
return normalizedSelectClause, errors.AssertionFailedf("unexpected result type %T", stmt)
}

if len(v.OIDs) == 0 {
Expand All @@ -250,10 +263,63 @@ func normalizeSelectClause(

for id := range v.OIDs {
if _, isAllowed := allowedOIDs[id]; !isAllowed {
return nil, pgerror.Newf(pgcode.FeatureNotSupported,
return normalizedSelectClause, pgerror.Newf(pgcode.FeatureNotSupported,
"use of user defined types not referenced by target table is not supported")
}
}

return normalizedSelectClause, nil
}

type checkForPrevVisitor struct {
semaCtx tree.SemaContext
foundPrev bool
}

// VisitPre implements the Visitor interface.
func (v *checkForPrevVisitor) VisitPre(expr tree.Expr) (bool, tree.Expr) {
if exprRequiresPreviousValue(v.semaCtx, expr) {
v.foundPrev = true
// no need to keep recursing
return false, expr
}
return true, expr
}

// VisitPost implements the Visitor interface.
func (v *checkForPrevVisitor) VisitPost(e tree.Expr) tree.Expr {
return e
}

// exprRequiresPreviousValue returns true if the top-level expression
// is a function call that cdc implements using the diff from a rangefeed.
func exprRequiresPreviousValue(semaCtx tree.SemaContext, e tree.Expr) bool {
if f, ok := e.(*tree.FuncExpr); ok {
var name string
switch fn := f.Func.FunctionReference.(type) {
case *tree.UnresolvedName:
funDef, err := fn.ResolveFunction(semaCtx.SearchPath)
if err != nil {
return false
}
name = funDef.Name
case *tree.FunctionDefinition:
name = fn.Name
default:
name = f.String()
}
return name == "cdc_prev"
}
return false
}

// SelectClauseRequiresPrev checks whether a changefeed expression will need a row's previous values
// to be fetched in order to evaluate it.
func SelectClauseRequiresPrev(semaCtx tree.SemaContext, sc NormalizedSelectClause) (bool, error) {
c := checkForPrevVisitor{semaCtx: semaCtx}
_, err := tree.SimpleStmtVisit(sc.Clause(), func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) {
recurse, newExpr = c.VisitPre(expr)
return recurse, newExpr, nil
})
return c.foundPrev, err
}
96 changes: 95 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestNormalizeAndValidate(t *testing.T) {
StatementTimeName: tc.desc.GetName(),
}

err = NormalizeAndValidateSelectForTarget(ctx, execCtx, tc.desc, target, sc, false)
_, err = NormalizeAndValidateSelectForTarget(ctx, execCtx, tc.desc, target, sc, false)
if tc.expectErr != "" {
require.Regexp(t, tc.expectErr, err)
return
Expand All @@ -150,3 +150,97 @@ func TestNormalizeAndValidate(t *testing.T) {
})
}
}

func TestSelectClauseRequiresPrev(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE table foo (id int primary key, s string)`)
sqlDB.Exec(t, `CREATE table cdc_prev (id int primary key, s string)`)
sqlDB.Exec(t, `CREATE table misleading_column_name (id int primary key, cdc_prev string)`)

descs := make(map[string]catalog.TableDescriptor)
for _, name := range []string{`foo`, `cdc_prev`, `misleading_column_name`} {
descs[name] = cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), tree.Name(name))
}

ctx := context.Background()
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
p, cleanup := sql.NewInternalPlanner("test",
kvDB.NewTxn(ctx, "test-planner"),
username.RootUserName(), &sql.MemoryMetrics{}, &execCfg,
sessiondatapb.SessionData{
Database: "defaultdb",
SearchPath: sessiondata.DefaultSearchPath.GetPathArray(),
})
defer cleanup()
execCtx := p.(sql.JobExecContext)

for _, tc := range []struct {
name string
desc catalog.TableDescriptor
stmt string
expect bool
}{
{
name: "top level call to cdc_prev",
desc: descs[`foo`],
stmt: "SELECT cdc_prev() from foo",
expect: true,
},
{
name: "nested call to cdc_prev",
desc: descs[`foo`],
stmt: "SELECT jsonb_build_object('op',IF(cdc_is_delete(),'u',IF(cdc_prev()::string='null','c','u'))) from foo",
expect: true,
},
{
name: "cdc_prev in the predicate",
desc: descs[`foo`],
stmt: "SELECT * from foo WHERE (cdc_prev()->'s')::string != s",
expect: true,
},
{
name: "case insensitive",
desc: descs[`foo`],
stmt: "SELECT CDC_PREV() from foo",
expect: true,
},
{
name: "contains misleading substring",
desc: descs[`foo`],
stmt: "SELECT 'cdc_prev()', s FROM foo",
expect: false,
},
{
name: "misleading table name",
desc: descs[`cdc_prev`],
stmt: "SELECT * FROM cdc_prev",
expect: false,
},
{
name: "misleading column name",
desc: descs[`misleading_column_name`],
stmt: "SELECT cdc_prev FROM misleading_column_name",
expect: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
sc, err := ParseChangefeedExpression(tc.stmt)
require.NoError(t, err)
target := jobspb.ChangefeedTargetSpecification{
TableID: tc.desc.GetID(),
StatementTimeName: tc.desc.GetName(),
}
normalized, err := NormalizeAndValidateSelectForTarget(ctx, execCtx, tc.desc, target, sc, false)
require.NoError(t, err)
actual, err := SelectClauseRequiresPrev(*execCtx.SemaCtx(), normalized)
require.NoError(t, err)
require.Equal(t, tc.expect, actual)
})
}
}
20 changes: 15 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,22 @@ func createChangefeedJobRecord(

if changefeedStmt.Select != nil {
// Serialize changefeed expression.
if err := validateAndNormalizeChangefeedExpression(
normalized, err := validateAndNormalizeChangefeedExpression(
ctx, p, changefeedStmt.Select, targetDescs, targets, opts.IncludeVirtual(),
); err != nil {
)
if err != nil {
return nil, err
}
needDiff, err := cdceval.SelectClauseRequiresPrev(*p.SemaCtx(), normalized)
if err != nil {
return nil, err
}
details.Select = cdceval.AsStringUnredacted(changefeedStmt.Select)
if needDiff {
opts.ForceDiff()
}
// TODO: Set the default envelope to row here when using a sink and format
// that support it.
details.Select = cdceval.AsStringUnredacted(normalized.Clause())
}

// TODO(dan): In an attempt to present the most helpful error message to the
Expand Down Expand Up @@ -821,9 +831,9 @@ func validateAndNormalizeChangefeedExpression(
descriptors map[tree.TablePattern]catalog.Descriptor,
targets []jobspb.ChangefeedTargetSpecification,
includeVirtual bool,
) error {
) (n cdceval.NormalizedSelectClause, _ error) {
if len(descriptors) != 1 || len(targets) != 1 {
return pgerror.Newf(pgcode.InvalidParameterValue, "CDC expressions require single table")
return n, pgerror.Newf(pgcode.InvalidParameterValue, "CDC expressions require single table")
}
var tableDescr catalog.TableDescriptor
for _, d := range descriptors {
Expand Down
Loading

0 comments on commit 97f8422

Please sign in to comment.