Skip to content

Commit

Permalink
Add query evaluation with app name
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Apr 29, 2024
1 parent 4fdf6a9 commit 32ea276
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 11 deletions.
1 change: 1 addition & 0 deletions cmd/fly-autoscaler/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (c *EvalCommand) Run(ctx context.Context, args []string) (err error) {

// Instantiate reconciler and evaluate once.
r := fas.NewReconciler()
r.AppName = c.Config.AppName
r.MinCreatedMachineN = c.Config.GetMinCreatedMachineN()
r.MaxCreatedMachineN = c.Config.GetMaxCreatedMachineN()
r.MinStartedMachineN = c.Config.GetMinStartedMachineN()
Expand Down
19 changes: 17 additions & 2 deletions metric_collector.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
package fas

import "context"
import (
"context"
"os"
)

// MetricCollector represents a client for collecting metrics from an external source.
type MetricCollector interface {
Name() string
CollectMetric(ctx context.Context) (float64, error)
CollectMetric(ctx context.Context, app string) (float64, error)
}

// ExpandMetricQuery replaces variables in query with their values.
func ExpandMetricQuery(ctx context.Context, query, app string) string {
return os.Expand(query, func(key string) string {
switch key {
case "APP_NAME":
return app
default:
return ""
}
})
}
31 changes: 31 additions & 0 deletions metric_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package fas_test

import (
"context"
"testing"

fas "github.com/superfly/fly-autoscaler"
)

func TestExpandMetricQuery(t *testing.T) {
t.Run("Static", func(t *testing.T) {
result := fas.ExpandMetricQuery(context.Background(), "foo", "my-app")
if got, want := result, `foo`; got != want {
t.Fatalf("got %q, want %q", got, want)
}
})

t.Run("Bare", func(t *testing.T) {
result := fas.ExpandMetricQuery(context.Background(), "foo $APP_NAME bar", "my-app")
if got, want := result, `foo my-app bar`; got != want {
t.Fatalf("got %q, want %q", got, want)
}
})

t.Run("Wrapped", func(t *testing.T) {
result := fas.ExpandMetricQuery(context.Background(), "foo${APP_NAME}bar", "my-app")
if got, want := result, `foomy-appbar`; got != want {
t.Fatalf("got %q, want %q", got, want)
}
})
}
6 changes: 3 additions & 3 deletions mock/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ var _ fas.MetricCollector = (*MetricCollector)(nil)

type MetricCollector struct {
name string
CollectMetricFunc func(ctx context.Context) (float64, error)
CollectMetricFunc func(ctx context.Context, app string) (float64, error)
}

func NewMetricCollector(name string) *MetricCollector {
Expand All @@ -19,6 +19,6 @@ func NewMetricCollector(name string) *MetricCollector {

func (c *MetricCollector) Name() string { return c.name }

func (c *MetricCollector) CollectMetric(ctx context.Context) (float64, error) {
return c.CollectMetricFunc(ctx)
func (c *MetricCollector) CollectMetric(ctx context.Context, app string) (float64, error) {
return c.CollectMetricFunc(ctx, app)
}
6 changes: 4 additions & 2 deletions prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ func (c *MetricCollector) Name() string {
return c.name
}

func (c *MetricCollector) CollectMetric(ctx context.Context) (float64, error) {
result, warnings, err := c.api.Query(context.Background(), c.query, time.Now())
func (c *MetricCollector) CollectMetric(ctx context.Context, app string) (float64, error) {
query := fas.ExpandMetricQuery(ctx, c.query, app)

result, warnings, err := c.api.Query(context.Background(), query, time.Now())
if err != nil {
return 0, err
} else if len(warnings) > 0 {
Expand Down
5 changes: 4 additions & 1 deletion reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type Reconciler struct {
// Client to connect to Machines API to scale app. Required.
Client FlapsClient

// The name of the app currently being reconciled.
AppName string

// List of regions that machines can be created in.
// The reconciler uses a round-robin approach to choosing next region.
Regions []string
Expand Down Expand Up @@ -79,7 +82,7 @@ func (r *Reconciler) CollectMetrics(ctx context.Context) error {
r.metrics = make(map[string]float64)

for _, c := range r.Collectors {
value, err := c.CollectMetric(ctx)
value, err := c.CollectMetric(ctx, r.AppName)
if err != nil {
return fmt.Errorf("collect metric (%q): %w", c.Name(), err)
}
Expand Down
1 change: 1 addition & 0 deletions reconciler_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (p *ReconcilerPool) monitorReconciler(ctx context.Context, r *Reconciler) {
ctx, cancel := context.WithTimeoutCause(p.ctx, p.ReconcileTimeout, errReconciliationTimeout)
defer cancel()

r.AppName = info.name
r.Client = info.client

if err := r.CollectMetrics(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion reconciler_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestReconcilerPool_Run_SingleApp(t *testing.T) {
// Collector will simply mirror the target value.
var target atomic.Int64
collector := mock.NewMetricCollector("target")
collector.CollectMetricFunc = func(ctx context.Context) (float64, error) {
collector.CollectMetricFunc = func(ctx context.Context, app string) (float64, error) {
return float64(target.Load()), nil
}

Expand Down
4 changes: 3 additions & 1 deletion temporal/temporal.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func (c *MetricCollector) Name() string {
return c.name
}

func (c *MetricCollector) CollectMetric(ctx context.Context) (float64, error) {
func (c *MetricCollector) CollectMetric(ctx context.Context, app string) (float64, error) {
// Append additional query filter, if specified.
query := `ExecutionStatus="Running"`
if c.Query != "" {
query += " AND (" + c.Query + ")"
}

query = fas.ExpandMetricQuery(ctx, query, app)

resp, err := c.client.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{
Query: query,
})
Expand Down
2 changes: 1 addition & 1 deletion temporal/temporal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestMetricCollector_CollectMetric(t *testing.T) {
}

t.Log("querying metric")
if v, err := c.CollectMetric(context.Background()); err != nil {
if v, err := c.CollectMetric(context.Background(), "myapp"); err != nil {
t.Fatal(err)
} else if got, want := v, 2.0; got != want {
t.Fatalf("metric=%v, want %v", got, want)
Expand Down

0 comments on commit 32ea276

Please sign in to comment.