From 32ea2764dc009424ab0731c3f89f98239e0fac59 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 29 Apr 2024 10:15:23 -0600 Subject: [PATCH] Add query evaluation with app name --- cmd/fly-autoscaler/eval.go | 1 + metric_collector.go | 19 +++++++++++++++++-- metric_collector_test.go | 31 +++++++++++++++++++++++++++++++ mock/metric_collector.go | 6 +++--- prometheus/prometheus.go | 6 ++++-- reconciler.go | 5 ++++- reconciler_pool.go | 1 + reconciler_pool_test.go | 2 +- temporal/temporal.go | 4 +++- temporal/temporal_test.go | 2 +- 10 files changed, 66 insertions(+), 11 deletions(-) create mode 100644 metric_collector_test.go diff --git a/cmd/fly-autoscaler/eval.go b/cmd/fly-autoscaler/eval.go index 36ce62f..4927f8c 100644 --- a/cmd/fly-autoscaler/eval.go +++ b/cmd/fly-autoscaler/eval.go @@ -34,6 +34,7 @@ func (c *EvalCommand) Run(ctx context.Context, args []string) (err error) { // Instantiate reconciler and evaluate once. r := fas.NewReconciler() + r.AppName = c.Config.AppName r.MinCreatedMachineN = c.Config.GetMinCreatedMachineN() r.MaxCreatedMachineN = c.Config.GetMaxCreatedMachineN() r.MinStartedMachineN = c.Config.GetMinStartedMachineN() diff --git a/metric_collector.go b/metric_collector.go index 2df5164..016e09f 100644 --- a/metric_collector.go +++ b/metric_collector.go @@ -1,9 +1,24 @@ package fas -import "context" +import ( + "context" + "os" +) // MetricCollector represents a client for collecting metrics from an external source. type MetricCollector interface { Name() string - CollectMetric(ctx context.Context) (float64, error) + CollectMetric(ctx context.Context, app string) (float64, error) +} + +// ExpandMetricQuery replaces variables in query with their values. +func ExpandMetricQuery(ctx context.Context, query, app string) string { + return os.Expand(query, func(key string) string { + switch key { + case "APP_NAME": + return app + default: + return "" + } + }) } diff --git a/metric_collector_test.go b/metric_collector_test.go new file mode 100644 index 0000000..44a7349 --- /dev/null +++ b/metric_collector_test.go @@ -0,0 +1,31 @@ +package fas_test + +import ( + "context" + "testing" + + fas "github.com/superfly/fly-autoscaler" +) + +func TestExpandMetricQuery(t *testing.T) { + t.Run("Static", func(t *testing.T) { + result := fas.ExpandMetricQuery(context.Background(), "foo", "my-app") + if got, want := result, `foo`; got != want { + t.Fatalf("got %q, want %q", got, want) + } + }) + + t.Run("Bare", func(t *testing.T) { + result := fas.ExpandMetricQuery(context.Background(), "foo $APP_NAME bar", "my-app") + if got, want := result, `foo my-app bar`; got != want { + t.Fatalf("got %q, want %q", got, want) + } + }) + + t.Run("Wrapped", func(t *testing.T) { + result := fas.ExpandMetricQuery(context.Background(), "foo${APP_NAME}bar", "my-app") + if got, want := result, `foomy-appbar`; got != want { + t.Fatalf("got %q, want %q", got, want) + } + }) +} diff --git a/mock/metric_collector.go b/mock/metric_collector.go index d98e515..29a5e12 100644 --- a/mock/metric_collector.go +++ b/mock/metric_collector.go @@ -10,7 +10,7 @@ var _ fas.MetricCollector = (*MetricCollector)(nil) type MetricCollector struct { name string - CollectMetricFunc func(ctx context.Context) (float64, error) + CollectMetricFunc func(ctx context.Context, app string) (float64, error) } func NewMetricCollector(name string) *MetricCollector { @@ -19,6 +19,6 @@ func NewMetricCollector(name string) *MetricCollector { func (c *MetricCollector) Name() string { return c.name } -func (c *MetricCollector) CollectMetric(ctx context.Context) (float64, error) { - return c.CollectMetricFunc(ctx) +func (c *MetricCollector) CollectMetric(ctx context.Context, app string) (float64, error) { + return c.CollectMetricFunc(ctx, app) } diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 8320bce..5b330ed 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -42,8 +42,10 @@ func (c *MetricCollector) Name() string { return c.name } -func (c *MetricCollector) CollectMetric(ctx context.Context) (float64, error) { - result, warnings, err := c.api.Query(context.Background(), c.query, time.Now()) +func (c *MetricCollector) CollectMetric(ctx context.Context, app string) (float64, error) { + query := fas.ExpandMetricQuery(ctx, c.query, app) + + result, warnings, err := c.api.Query(context.Background(), query, time.Now()) if err != nil { return 0, err } else if len(warnings) > 0 { diff --git a/reconciler.go b/reconciler.go index 652c415..f525b95 100644 --- a/reconciler.go +++ b/reconciler.go @@ -21,6 +21,9 @@ type Reconciler struct { // Client to connect to Machines API to scale app. Required. Client FlapsClient + // The name of the app currently being reconciled. + AppName string + // List of regions that machines can be created in. // The reconciler uses a round-robin approach to choosing next region. Regions []string @@ -79,7 +82,7 @@ func (r *Reconciler) CollectMetrics(ctx context.Context) error { r.metrics = make(map[string]float64) for _, c := range r.Collectors { - value, err := c.CollectMetric(ctx) + value, err := c.CollectMetric(ctx, r.AppName) if err != nil { return fmt.Errorf("collect metric (%q): %w", c.Name(), err) } diff --git a/reconciler_pool.go b/reconciler_pool.go index 6d13f1c..afdab7d 100644 --- a/reconciler_pool.go +++ b/reconciler_pool.go @@ -278,6 +278,7 @@ func (p *ReconcilerPool) monitorReconciler(ctx context.Context, r *Reconciler) { ctx, cancel := context.WithTimeoutCause(p.ctx, p.ReconcileTimeout, errReconciliationTimeout) defer cancel() + r.AppName = info.name r.Client = info.client if err := r.CollectMetrics(ctx); err != nil { diff --git a/reconciler_pool_test.go b/reconciler_pool_test.go index 043cc6f..9c98032 100644 --- a/reconciler_pool_test.go +++ b/reconciler_pool_test.go @@ -113,7 +113,7 @@ func TestReconcilerPool_Run_SingleApp(t *testing.T) { // Collector will simply mirror the target value. var target atomic.Int64 collector := mock.NewMetricCollector("target") - collector.CollectMetricFunc = func(ctx context.Context) (float64, error) { + collector.CollectMetricFunc = func(ctx context.Context, app string) (float64, error) { return float64(target.Load()), nil } diff --git a/temporal/temporal.go b/temporal/temporal.go index 33b42a2..24dc132 100644 --- a/temporal/temporal.go +++ b/temporal/temporal.go @@ -70,13 +70,15 @@ func (c *MetricCollector) Name() string { return c.name } -func (c *MetricCollector) CollectMetric(ctx context.Context) (float64, error) { +func (c *MetricCollector) CollectMetric(ctx context.Context, app string) (float64, error) { // Append additional query filter, if specified. query := `ExecutionStatus="Running"` if c.Query != "" { query += " AND (" + c.Query + ")" } + query = fas.ExpandMetricQuery(ctx, query, app) + resp, err := c.client.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{ Query: query, }) diff --git a/temporal/temporal_test.go b/temporal/temporal_test.go index d8fa248..01c613c 100644 --- a/temporal/temporal_test.go +++ b/temporal/temporal_test.go @@ -34,7 +34,7 @@ func TestMetricCollector_CollectMetric(t *testing.T) { } t.Log("querying metric") - if v, err := c.CollectMetric(context.Background()); err != nil { + if v, err := c.CollectMetric(context.Background(), "myapp"); err != nil { t.Fatal(err) } else if got, want := v, 2.0; got != want { t.Fatalf("metric=%v, want %v", got, want)