Skip to content

Commit

Permalink
feat(query/stdlib): promote schema and fill optimizations from featur…
Browse files Browse the repository at this point in the history
…e flags
  • Loading branch information
jsternberg committed Jan 28, 2021
1 parent 7f65b71 commit 8e70904
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 117 deletions.
43 changes: 0 additions & 43 deletions query/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ func init() {
PushDownBareAggregateRule{},
GroupWindowAggregateTransposeRule{},
// PushDownGroupAggregateRule{},
SwitchFillImplRule{},
SwitchSchemaMutationImplRule{},
)
plan.RegisterLogicalRules(
MergeFiltersRule{},
Expand Down Expand Up @@ -1064,47 +1062,6 @@ func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool {
return false
}

type SwitchFillImplRule struct{}

func (SwitchFillImplRule) Name() string {
return "SwitchFillImplRule"
}

func (SwitchFillImplRule) Pattern() plan.Pattern {
return plan.Pat(universe.FillKind, plan.Any())
}

func (r SwitchFillImplRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
if !feature.MemoryOptimizedFill().Enabled(ctx) {
spec := pn.ProcedureSpec().Copy()
universe.UseDeprecatedImpl(spec)
if err := pn.ReplaceSpec(spec); err != nil {
return nil, false, err
}
}
return pn, false, nil
}

type SwitchSchemaMutationImplRule struct{}

func (SwitchSchemaMutationImplRule) Name() string {
return "SwitchSchemaMutationImplRule"
}

func (SwitchSchemaMutationImplRule) Pattern() plan.Pattern {
return plan.Pat(universe.SchemaMutationKind, plan.Any())
}

func (r SwitchSchemaMutationImplRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
spec, ok := pn.ProcedureSpec().(*universe.DualImplProcedureSpec)
if !ok || spec.UseDeprecated {
return pn, false, nil
}

spec.UseDeprecated = !feature.MemoryOptimizedSchemaMutation().Enabled(ctx)
return pn, spec.UseDeprecated, nil
}

func asSchemaMutationProcedureSpec(spec plan.ProcedureSpec) *universe.SchemaMutationProcedureSpec {
if s, ok := spec.(*universe.DualImplProcedureSpec); ok {
spec = s.ProcedureSpec
Expand Down
74 changes: 0 additions & 74 deletions query/stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2878,80 +2878,6 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
}
}

func TestSwitchFillImplRule(t *testing.T) {
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.MemoryOptimizedFill(): true,
})
withFlagger, _ := feature.Annotate(context.Background(), flagger)
readRange := &influxdb.ReadRangePhysSpec{
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
sourceSpec := &universe.DualImplProcedureSpec{
ProcedureSpec: &universe.FillProcedureSpec{
DefaultCost: plan.DefaultCost{},
Column: "_value",
Value: values.NewFloat(0),
UsePrevious: false,
},
UseDeprecated: false,
}
targetSpec := sourceSpec.Copy().(*universe.DualImplProcedureSpec)
universe.UseDeprecatedImpl(targetSpec)

testcases := []plantest.RuleTestCase{
{
Context: withFlagger,
Name: "enable memory optimized fill",
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("fill", sourceSpec),
},
Edges: [][2]int{
{0, 1},
},
},
NoChange: true,
},
{
Context: context.Background(),
Name: "disable memory optimized fill",
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("fill", sourceSpec),
},
Edges: [][2]int{
{0, 1},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("fill", targetSpec),
},
Edges: [][2]int{
{0, 1},
},
},
},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}

func TestMergeFilterRule(t *testing.T) {
from := &fluxinfluxdb.FromProcedureSpec{}
filter0 := func() *universe.FilterProcedureSpec {
Expand Down

0 comments on commit 8e70904

Please sign in to comment.