Skip to content

Commit

Permalink
Runtime: Parallelize validation of metrics view dimensions and measur…
Browse files Browse the repository at this point in the history
…es (#4525)

* Runtime: Parallelize validation of metrics view dimensions and measures

* Use a const

* Make test deterministic
  • Loading branch information
begelundmuller authored and AdityaHegde committed Apr 9, 2024
1 parent 003a9ad commit 82edec5
Showing 1 changed file with 86 additions and 57 deletions.
143 changes: 86 additions & 57 deletions runtime/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"strings"
"sync"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"golang.org/x/sync/errgroup"
)

const validateConcurrencyLimit = 10

type ValidateMetricsViewResult struct {
TimeDimensionErr error
DimensionErrs []IndexErr
Expand Down Expand Up @@ -86,42 +91,14 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv
}
}

// Check dimension columns exist
for idx, d := range mv.Dimensions {
err = validateDimensionExist(d, fields)
if err != nil {
res.DimensionErrs = append(res.DimensionErrs, IndexErr{
Idx: idx,
Err: err,
})
}
}

err = validateAllDimensionAndMeasureExpr(ctx, olap, t, mv)
// For performance, attempt to validate all dimensions and measures at once
err = validateAllDimensionsAndMeasures(ctx, olap, t, mv)
if err != nil {
// Some measure or dimension failed validation, check individuals to provide useful errors
// Check dimension expressions are valid
for idx, d := range mv.Dimensions {
err = validateDimensionExpr(ctx, olap, t, d)
if err != nil {
res.DimensionErrs = append(res.DimensionErrs, IndexErr{
Idx: idx,
Err: err,
})
}
}
// Check measure expressions are valid
for idx, m := range mv.Measures {
err := validateMeasure(ctx, olap, t, m)
if err != nil {
res.MeasureErrs = append(res.MeasureErrs, IndexErr{
Idx: idx,
Err: fmt.Errorf("invalid expression for measure %q: %w", m.Name, err),
})
}
}
// One or more dimension/measure expressions failed to validate. We need to check each one individually to provide useful errors.
validateIndividualDimensionsAndMeasures(ctx, olap, t, mv, fields, res)
}

// Check the default theme exists
if mv.DefaultTheme != "" {
_, err := ctrl.Get(ctx, &runtimev1.ResourceName{Kind: ResourceKindTheme, Name: mv.DefaultTheme}, false)
if err != nil {
Expand All @@ -135,25 +112,21 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv
return res, nil
}

func validateDimensionExist(d *runtimev1.MetricsViewSpec_DimensionV2, fields map[string]*runtimev1.StructType_Field) error {
if d.Column != "" {
if _, isColumn := fields[strings.ToLower(d.Column)]; !isColumn {
return fmt.Errorf("failed to validate dimension %q: column %q not found in table", d.Name, d.Column)
}
}
return nil
}

func validateAllDimensionAndMeasureExpr(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec) error {
// validateAllDimensionsAndMeasures validates all dimensions and measures with one query. It returns an error if any of the expressions are invalid.
func validateAllDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec) error {
var dimExprs []string
var groupIndexes []string
for idx, d := range mv.Dimensions {
dimExprs = append(dimExprs, extractDimExpr(d))
if d.Column != "" {
dimExprs = append(dimExprs, olap.Dialect().EscapeIdentifier(d.Column))
} else {
dimExprs = append(dimExprs, "("+d.Expression+")")
}
groupIndexes = append(groupIndexes, strconv.Itoa(idx+1))
}
var metricExprs []string
for _, m := range mv.Measures {
metricExprs = append(metricExprs, m.Expression) // note the = instead of :=
metricExprs = append(metricExprs, "("+m.Expression+")")
}
var query string
if len(dimExprs) == 0 && len(metricExprs) == 0 {
Expand All @@ -165,9 +138,9 @@ func validateAllDimensionAndMeasureExpr(ctx context.Context, olap drivers.OLAPSt
query = fmt.Sprintf("SELECT 1, %s FROM %s GROUP BY 1", strings.Join(metricExprs, ","), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name))
} else if len(metricExprs) == 0 {
// No metrics
query = fmt.Sprintf("SELECT (%s) FROM %s GROUP BY %s", strings.Join(dimExprs, "),("), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), strings.Join(groupIndexes, ","))
query = fmt.Sprintf("SELECT %s FROM %s GROUP BY %s", strings.Join(dimExprs, ","), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), strings.Join(groupIndexes, ","))
} else {
query = fmt.Sprintf("SELECT (%s), %s FROM %s GROUP BY %s", strings.Join(dimExprs, "),("), strings.Join(metricExprs, ","), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), strings.Join(groupIndexes, ","))
query = fmt.Sprintf("SELECT %s, %s FROM %s GROUP BY %s", strings.Join(dimExprs, ","), strings.Join(metricExprs, ","), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name), strings.Join(groupIndexes, ","))
}
err := olap.Exec(ctx, &drivers.Statement{
Query: query,
Expand All @@ -179,9 +152,71 @@ func validateAllDimensionAndMeasureExpr(ctx context.Context, olap drivers.OLAPSt
return nil
}

func validateDimensionExpr(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, d *runtimev1.MetricsViewSpec_DimensionV2) error {
// validateIndividualDimensionsAndMeasures validates each dimension and measure individually.
// It adds validation errors to the provided res.
func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec, fields map[string]*runtimev1.StructType_Field, res *ValidateMetricsViewResult) {
// Validate dimensions and measures concurrently with a limit of 10 concurrent validations
var mu sync.Mutex
var grp errgroup.Group
grp.SetLimit(validateConcurrencyLimit)

// Check dimension expressions are valid
for idx, d := range mv.Dimensions {
idx := idx
grp.Go(func() error {
err := validateDimension(ctx, olap, t, d, fields)
if err != nil {
mu.Lock()
defer mu.Unlock()

res.DimensionErrs = append(res.DimensionErrs, IndexErr{
Idx: idx,
Err: err,
})
}
return nil
})
}

// Check measure expressions are valid
for idx, m := range mv.Measures {
idx := idx
grp.Go(func() error {
err := validateMeasure(ctx, olap, t, m)
if err != nil {
mu.Lock()
defer mu.Unlock()

res.MeasureErrs = append(res.MeasureErrs, IndexErr{
Idx: idx,
Err: fmt.Errorf("invalid expression for measure %q: %w", m.Name, err),
})
}
return nil
})
}

// Wait for all validations to complete
_ = grp.Wait()

// Sort errors by index (for stable output)
slices.SortFunc(res.DimensionErrs, func(a, b IndexErr) int { return a.Idx - b.Idx })
slices.SortFunc(res.MeasureErrs, func(a, b IndexErr) int { return a.Idx - b.Idx })
}

// validateDimension validates a metrics view dimension.
func validateDimension(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, d *runtimev1.MetricsViewSpec_DimensionV2, fields map[string]*runtimev1.StructType_Field) error {
// Validate with a simple check if it's a column
if d.Column != "" {
if _, isColumn := fields[strings.ToLower(d.Column)]; !isColumn {
return fmt.Errorf("failed to validate dimension %q: column %q not found in table", d.Name, d.Column)
}
return nil
}

// Validate with a query if it's an expression
err := olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("SELECT (%s) FROM %s GROUP BY 1", extractDimExpr(d), olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)),
Query: fmt.Sprintf("SELECT (%s) FROM %s GROUP BY 1", d.Expression, olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)),
DryRun: true,
})
if err != nil {
Expand All @@ -190,17 +225,11 @@ func validateDimensionExpr(ctx context.Context, olap drivers.OLAPStore, t *drive
return nil
}

// validateMeasure validates a metrics view measure.
func validateMeasure(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, m *runtimev1.MetricsViewSpec_MeasureV2) error {
err := olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("SELECT 1, %s FROM %s GROUP BY 1", m.Expression, olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)),
Query: fmt.Sprintf("SELECT 1, (%s) FROM %s GROUP BY 1", m.Expression, olap.Dialect().EscapeTable(t.Database, t.DatabaseSchema, t.Name)),
DryRun: true,
})
return err
}

func extractDimExpr(d *runtimev1.MetricsViewSpec_DimensionV2) string {
if d.Column != "" {
return "\"" + d.Column + "\""
}
return d.Expression
}

0 comments on commit 82edec5

Please sign in to comment.