Skip to content

Commit

Permalink
comparison aggregation - Druid no dim values fix (#5086)
Browse files Browse the repository at this point in the history
Co-authored-by: Egor Ryashin <egor.ryashin@rilldata.com>
  • Loading branch information
egor-ryashin and Egor Ryashin authored Jun 17, 2024
1 parent dd7199d commit a8b416e
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 15 deletions.
36 changes: 22 additions & 14 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,13 +2505,15 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex
outerGroupClause := ""
if len(whereDimConditions) > 0 {
whereDimClause = fmt.Sprintf(" AND (%s) ", strings.Join(whereDimConditions, " OR "))
}
if len(q.Dimensions) > 0 {
outerGroupClause = " GROUP BY " + strings.Join(outerGroupCols, ",")
}

sql = fmt.Sprintf(`
SELECT * from (
-- SELECT base.d1 d1, base.d2 d2, base.timed1 td1, base.m1 m1, comparison.m2 m2 ... , comparison.timed1 td1__previous, ...
SELECT %[2]s %[20]s FROM
SELECT %[2]s %[9]s FROM
(
-- SELECT t_offset, d1, d2, d3, td1, td2, m1, m2 ...
SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s GROUP BY %[10]s %[12]s
Expand All @@ -2523,12 +2525,11 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex
ON
-- base.d1 IS NOT DISTINCT FROM comparison.d1 AND base.d2 IS NOT DISTINCT FROM comparison.d2 AND ...
%[17]s
%[19]s -- GROUP BY ...
%[11]s -- GROUP BY ...
%[6]s -- ORDER BY ...
) WHERE 1=1 AND %[15]s
%[7]s -- LIMIT ...
OFFSET %[8]d
`,
baseSelectClause, // 1
strings.Join(slices.Concat(finalDims, []string{finalSelectClause}), ","), // 2
Expand All @@ -2538,9 +2539,9 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex
orderByClause, // 6
limitClause, // 7
q.Offset, // 8
finalSelectClause, // 9
finalTimeDimsClause, // 9
strings.Join(innerGroupCols, ","), // 10
joinType, // 11
outerGroupClause, // 11
baseLimitClause, // 12
comparisonLimitClause, // 13
strings.Join(unnestClauses, ""), // 14
Expand All @@ -2549,7 +2550,6 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex
strings.Join(joinConditions, " AND "), // 17
whereDimClause, // 18
outerGroupClause, // 19
finalTimeDimsClause, // 20
)
} else {
limit := 0
Expand Down Expand Up @@ -2651,19 +2651,29 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex
if len(finalComparisonTimeDims) > 0 {
finalTimeDimsClause = fmt.Sprintf(", %s", strings.Join(finalComparisonTimeDims, ", "))
}

whereDimClause := ""
outerGroupClause := ""
if len(whereDimConditions) > 0 {
whereDimClause = fmt.Sprintf(" AND (%s) ", strings.Join(whereDimConditions, " OR "))
}
if len(q.Dimensions) > 0 {
outerGroupClause = " GROUP BY " + strings.Join(outerGroupCols, ",")
}

sql = fmt.Sprintf(`
SELECT * from (
SELECT %[2]s, %[9]s %[20]s FROM
SELECT %[2]s %[9]s FROM
(
SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s AND (%[18]s) GROUP BY %[10]s %[12]s
SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s %[18]s GROUP BY %[10]s %[12]s
) base
LEFT JOIN
(
SELECT %[16]s FROM %[3]s %[14]s WHERE %[5]s GROUP BY %[10]s %[13]s
) comparison
ON
%[17]s
GROUP BY %[19]s
%[11]s
%[6]s
%[7]s
OFFSET
Expand All @@ -2678,18 +2688,16 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex
orderByClause, // 6
limitClause, // 7
q.Offset, // 8
finalSelectClause, // 9
finalTimeDimsClause, // 9
strings.Join(innerGroupCols, ","), // 10
joinType, // 11
outerGroupClause, // 11
baseLimitClause, // 12
comparisonLimitClause, // 13
strings.Join(unnestClauses, ""), // 14
havingClause, // 15
comparisonSelectClause, // 16
strings.Join(joinConditions, " AND "), // 17
strings.Join(whereDimConditions, " OR "), // 18
strings.Join(outerGroupCols, ","), // 19
finalTimeDimsClause, // 20
whereDimClause, // 18
)
}
}
Expand Down
158 changes: 157 additions & 1 deletion runtime/queries/metricsview_aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3873,7 +3873,163 @@ func TestMetricsViewsAggregation_comparison_Druid_one_dim_comparison_order(t *te
require.Equal(t, "Google,2022-01-01T00:00:00Z,3.17,3.18,-0.02,-0.00,2022-01-02T00:00:00Z", fieldsToString2digits(rows[i], "pub", "timestamp_day", "m1", "m1__previous", "m1__delta_abs", "m1__delta_rel", "timestamp_day__previous"))
}

func TestMetricsViewsAggregation_comparison_Druid(t *testing.T) {
func TestMetricsViewsAggregation_Druid_comparison_empty_set_previous_sorted(t *testing.T) {
if os.Getenv("LOCALDRUID") == "" {
t.Skip("skipping the test in non-local Druid environment")
}
rt, instanceID := testruntime.NewInstanceForDruidProject(t)

limit := int64(10)
q := &queries.MetricsViewAggregation{
MetricsViewName: "ad_bids_metrics",
Dimensions: []*runtimev1.MetricsViewAggregationDimension{
{
Name: "pub",
},
{
Name: "__time",
TimeGrain: runtimev1.TimeGrain_TIME_GRAIN_DAY,
},
},
Measures: []*runtimev1.MetricsViewAggregationMeasure{
{
Name: "m1",
},
{
Name: "m1_p",
Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{
ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{
Measure: "m1",
},
},
},
},
Where: expressionpb.OrAll(
expressionpb.Eq("pub", "nothing"),
),
Having: expressionpb.Gt("m1", 0.0),
Sort: []*runtimev1.MetricsViewAggregationSort{
{
Name: "pub",
},
{
Name: "__time",
},
{
Name: "m1_p",
},
},
TimeRange: &runtimev1.TimeRange{
Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)),
End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)),
},
ComparisonTimeRange: &runtimev1.TimeRange{
Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)),
End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)),
},
Limit: &limit,
}
err := q.Resolve(context.Background(), rt, instanceID, 0)
require.NoError(t, err)
require.NotEmpty(t, q.Result)
fields := q.Result.Schema.Fields
require.Equal(t, "pub,__time,m1,m1_p,__time__previous", columnNames(fields))

for _, sf := range q.Result.Schema.Fields {
fmt.Printf("%v ", sf.Name)
}
fmt.Printf("\n")

for i, row := range q.Result.Data {
for _, sf := range q.Result.Schema.Fields {
fmt.Printf("%v ", row.Fields[sf.Name].AsInterface())
}
fmt.Printf(" %d \n", i)

}
rows := q.Result.Data
require.Equal(t, 0, len(rows))
}

func TestMetricsViewsAggregation_Druid_comparison_empty_set(t *testing.T) {
if os.Getenv("LOCALDRUID") == "" {
t.Skip("skipping the test in non-local Druid environment")
}
rt, instanceID := testruntime.NewInstanceForDruidProject(t)

limit := int64(10)
q := &queries.MetricsViewAggregation{
MetricsViewName: "ad_bids_metrics",
Dimensions: []*runtimev1.MetricsViewAggregationDimension{
{
Name: "pub",
},
{
Name: "__time",
TimeGrain: runtimev1.TimeGrain_TIME_GRAIN_DAY,
},
},
Measures: []*runtimev1.MetricsViewAggregationMeasure{
{
Name: "m1",
},
{
Name: "m1_p",
Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{
ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{
Measure: "m1",
},
},
},
},
Where: expressionpb.OrAll(
expressionpb.Eq("pub", "nothing"),
),
Having: expressionpb.Gt("m1", 0.0),
Sort: []*runtimev1.MetricsViewAggregationSort{
{
Name: "pub",
},
{
Name: "__time",
},
{
Name: "m1",
},
},
TimeRange: &runtimev1.TimeRange{
Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)),
End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)),
},
ComparisonTimeRange: &runtimev1.TimeRange{
Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)),
End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)),
},
Limit: &limit,
}
err := q.Resolve(context.Background(), rt, instanceID, 0)
require.NoError(t, err)
require.NotEmpty(t, q.Result)
fields := q.Result.Schema.Fields
require.Equal(t, "pub,__time,m1,m1_p,__time__previous", columnNames(fields))

for _, sf := range q.Result.Schema.Fields {
fmt.Printf("%v ", sf.Name)
}
fmt.Printf("\n")

for i, row := range q.Result.Data {
for _, sf := range q.Result.Schema.Fields {
fmt.Printf("%v ", row.Fields[sf.Name].AsInterface())
}
fmt.Printf(" %d \n", i)

}
rows := q.Result.Data
require.Equal(t, 0, len(rows))
}

func TestMetricsViewsAggregation_Druid_comparison(t *testing.T) {
if os.Getenv("LOCALDRUID") == "" {
t.Skip("skipping the test in non-local Druid environment")
}
Expand Down

0 comments on commit a8b416e

Please sign in to comment.