Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: Check alerts on external tables every 10 minutes by default #4347

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions admin/server/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,14 @@ func (s *Server) GetAlertYAML(ctx context.Context, req *adminv1.GetAlertYAMLRequ
func (s *Server) yamlForManagedAlert(opts *adminv1.AlertOptions, ownerUserID string) ([]byte, error) {
res := alertYAML{}
res.Kind = "alert"
res.Title = opts.Title
// manually add the ref
// Trigger the alert when the metrics view refreshes.
res.Refs = []string{fmt.Sprintf("MetricsView/%s", opts.MetricsViewName)}
res.Title = opts.Title
res.Watermark = "inherit"
res.Intervals.Duration = opts.IntervalDuration
res.Query.Name = opts.QueryName
res.Query.ArgsJSON = opts.QueryArgsJson
// hard code the user id to run for. this avoids exposing data through alert creation
// Hard code the user id to run for (to avoid exposing data through alert creation)
res.Query.For.UserID = ownerUserID
res.Email.Recipients = opts.Recipients
res.Email.Renotify = opts.EmailRenotify
Expand All @@ -469,9 +470,10 @@ func (s *Server) yamlForCommittedAlert(opts *adminv1.AlertOptions) ([]byte, erro

res := alertYAML{}
res.Kind = "alert"
res.Title = opts.Title
// manually add the ref
// Trigger the alert when the metrics view refreshes.
res.Refs = []string{fmt.Sprintf("MetricsView/%s", opts.MetricsViewName)}
res.Title = opts.Title
res.Watermark = "inherit"
res.Intervals.Duration = opts.IntervalDuration
res.Query.Name = opts.QueryName
res.Query.Args = args
Expand Down Expand Up @@ -534,13 +536,13 @@ func recreateAlertOptionsFromSpec(spec *runtimev1.AlertSpec) *adminv1.AlertOptio
// alertYAML is derived from rillv1.AlertYAML, but adapted for generating (as opposed to parsing) the alert YAML.
type alertYAML struct {
Kind string `yaml:"kind"`
Title string `yaml:"title"`
Refs []string `yaml:"refs"`
Title string `yaml:"title"`
Watermark string `yaml:"watermark"`
Intervals struct {
Duration string `yaml:"duration"`
} `yaml:"intervals"`
Timeout string `yaml:"timeout"`
Query struct {
Query struct {
Name string `yaml:"name"`
Args map[string]any `yaml:"args,omitempty"`
ArgsJSON string `yaml:"args_json,omitempty"`
Expand Down
831 changes: 422 additions & 409 deletions proto/gen/rill/runtime/v1/resources.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions proto/gen/rill/runtime/v1/resources.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4271,6 +4271,12 @@ definitions:
properties:
validSpec:
$ref: '#/definitions/v1MetricsViewSpec'
description: Valid spec is a (potentially previous) version of the spec that is known to currently be valid.
streaming:
type: boolean
description: |-
Streaming is true if the underlying data may change without the metrics view's spec/state version changing.
It's set to true if the metrics view is based on an externally managed table.
v1MetricsViewTimeRangeResponse:
type: object
properties:
Expand Down
4 changes: 4 additions & 0 deletions proto/rill/runtime/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ message MetricsViewSpec {
}

message MetricsViewState {
// Valid spec is a (potentially previous) version of the spec that is known to currently be valid.
MetricsViewSpec valid_spec = 1;
// Streaming is true if the underlying data may change without the metrics view's spec/state version changing.
// It's set to true if the metrics view is based on an externally managed table.
bool streaming = 2;
}

message Migration {
Expand Down
21 changes: 20 additions & 1 deletion runtime/reconcilers/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const alertQueryPriority = 1

const alertCheckDefaultTimeout = 5 * time.Minute

const alertStreamingRefDefaultRefreshCron = "*/10 * * * *"

func init() {
runtime.RegisterReconcilerInitializer(runtime.ResourceKindAlert, newAlertReconciler)
}
Expand Down Expand Up @@ -102,9 +104,19 @@ func (r *AlertReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
return runtime.ReconcileResult{}
}

// As a special rule, we override the refresh schedule to run every 10 minutes if:
// ref_update=true, one of the refs is streaming, and no other schedule is set.
if hasStreamingRef(ctx, r.C, self.Meta.Refs) {
if a.Spec.RefreshSchedule != nil && a.Spec.RefreshSchedule.RefUpdate {
if a.Spec.RefreshSchedule.TickerSeconds == 0 && a.Spec.RefreshSchedule.Cron == "" {
a.Spec.RefreshSchedule.Cron = alertStreamingRefDefaultRefreshCron
}
}
}

// Unlike other resources, alerts have different hashes for the spec and the refs' state.
// This enables differentiating behavior between changes to the spec and changes to the refs.
// When the spec changes, we clear all alert state. When the refs change, we just use it to trigger the alert ()
// When the spec changes, we clear all alert state. When the refs change, we just use it to trigger the alert.
specHash, err := r.executionSpecHash(a.Spec, self.Meta.Refs)
if err != nil {
return runtime.ReconcileResult{Err: fmt.Errorf("failed to compute hash: %w", err)}
Expand Down Expand Up @@ -743,6 +755,13 @@ func (r *AlertReconciler) computeInheritedWatermark(ctx context.Context, refs []
// calculateExecutionTimes calculates the execution times for an alert, taking into consideration the alert's intervals configuration and previous executions.
// If the alert is not configured to run on intervals, it will return a slice containing only the current watermark.
func calculateExecutionTimes(a *runtimev1.Alert, watermark, previousWatermark time.Time) ([]time.Time, error) {
// If the watermark is unchanged, skip the check.
// NOTE: It might make sense to make this configurable in the future, but the use cases seem limited.
// The watermark can only be unchanged if watermark="inherit", and since that indicates watermarks can be trusted, why check for the same watermark?
if watermark.Equal(previousWatermark) {
return nil, nil
}

// If the alert is not configured to run on intervals, check it just for the current watermark.
if a.Spec.IntervalsIsoDuration == "" {
return []time.Time{watermark}, nil
Expand Down
26 changes: 22 additions & 4 deletions runtime/reconcilers/metrics_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,42 @@ func (r *MetricsViewReconciler) Reconcile(ctx context.Context, n *runtimev1.Reso
return runtime.ReconcileResult{}
}

// NOTE: Not checking refs here since refs may still be valid even if they have errors (in case of staged changes).
// Instead, we just validate against the table name.
// NOTE: In other reconcilers, state like spec_hash and refreshed_on is used to avoid redundant reconciles.
// We don't do that here because none of the operations below are particularly expensive.
// So it doesn't really matter if they run a bit more often than necessary ¯\_(ツ)_/¯.

// NOTE: Not checking refs for errors since they may still be valid even if they have errors. Instead, we just validate the metrics view against the table name.

// Validate the metrics view and update ValidSpec
validateResult, validateErr := r.C.Runtime.ValidateMetricsView(ctx, r.C.InstanceID, mv.Spec)
if validateErr == nil {
validateErr = validateResult.Error()
}

if ctx.Err() != nil {
return runtime.ReconcileResult{Err: errors.Join(validateErr, ctx.Err())}
}

if validateErr == nil {
mv.State.ValidSpec = mv.Spec
} else {
mv.State.ValidSpec = nil
}

// Set the "streaming" state (see docstring in the proto for details).
mv.State.Streaming = false
if validateErr == nil {
// Find out if the metrics view has a ref to a source or model in the same project.
hasInternalRef := false
for _, ref := range self.Meta.Refs {
if ref.Kind == runtime.ResourceKindSource || ref.Kind == runtime.ResourceKindModel {
hasInternalRef = true
}
}

// If not, we assume the metrics view is based on an externally managed table and set the streaming state to true.
mv.State.Streaming = !hasInternalRef
}

// Update state. Even if the validation result is unchanged, we always update the state to ensure the state version is incremented.
err = r.C.UpdateState(ctx, self.Meta.Name, self)
if err != nil {
return runtime.ReconcileResult{Err: err}
Expand Down
22 changes: 22 additions & 0 deletions runtime/reconcilers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,28 @@ func checkRefs(ctx context.Context, c *runtime.Controller, refs []*runtimev1.Res
return nil
}

// hasStreamingRef returns true if one or more of the refs have data that may be updated outside of a reconcile.
func hasStreamingRef(ctx context.Context, c *runtime.Controller, refs []*runtimev1.ResourceName) bool {
for _, ref := range refs {
// Currently only metrics views can be streaming.
if ref.Kind != runtime.ResourceKindMetricsView {
continue
}

res, err := c.Get(ctx, ref, false)
if err != nil {
// Broken refs are not streaming.
continue
}
mv := res.GetMetricsView()

if mv.State.Streaming {
return true
}
}
return false
}

// nextRefreshTime returns the earliest time AFTER t that the schedule should trigger.
func nextRefreshTime(t time.Time, schedule *runtimev1.Schedule) (time.Time, error) {
if schedule == nil || schedule.Disable {
Expand Down
11 changes: 11 additions & 0 deletions web-common/src/proto/gen/rill/runtime/v1/resources_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1485,10 +1485,20 @@ export class MetricsViewSpec_AvailableTimeRange extends Message<MetricsViewSpec_
*/
export class MetricsViewState extends Message<MetricsViewState> {
/**
* Valid spec is a (potentially previous) version of the spec that is known to currently be valid.
*
* @generated from field: rill.runtime.v1.MetricsViewSpec valid_spec = 1;
*/
validSpec?: MetricsViewSpec;

/**
* Streaming is true if the underlying data may change without the metrics view's spec/state version changing.
* It's set to true if the metrics view is based on an externally managed table.
*
* @generated from field: bool streaming = 2;
*/
streaming = false;

constructor(data?: PartialMessage<MetricsViewState>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -1498,6 +1508,7 @@ export class MetricsViewState extends Message<MetricsViewState> {
static readonly typeName = "rill.runtime.v1.MetricsViewState";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "valid_spec", kind: "message", T: MetricsViewSpec },
{ no: 2, name: "streaming", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): MetricsViewState {
Expand Down
3 changes: 3 additions & 0 deletions web-common/src/runtime-client/gen/index.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,9 @@ export interface V1MetricsViewSpec {

export interface V1MetricsViewState {
validSpec?: V1MetricsViewSpec;
/** Streaming is true if the underlying data may change without the metrics view's spec/state version changing.
It's set to true if the metrics view is based on an externally managed table. */
streaming?: boolean;
}

export interface V1MetricsViewV2 {
Expand Down
Loading