Skip to content

Commit

Permalink
Implement multi-app scaling (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson authored Apr 30, 2024
1 parent 3adaa1e commit 5125fb6
Show file tree
Hide file tree
Showing 16 changed files with 923 additions and 427 deletions.
3 changes: 2 additions & 1 deletion cmd/fly-autoscaler/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (c *EvalCommand) Run(ctx context.Context, args []string) (err error) {
}

// Instantiate reconciler and evaluate once.
r := fas.NewReconciler(nil)
r := fas.NewReconciler()
r.AppName = c.Config.AppName
r.MinCreatedMachineN = c.Config.GetMinCreatedMachineN()
r.MaxCreatedMachineN = c.Config.GetMaxCreatedMachineN()
r.MinStartedMachineN = c.Config.GetMinStartedMachineN()
Expand Down
85 changes: 64 additions & 21 deletions cmd/fly-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"log/slog"
"os"
"os/signal"
"strconv"
"strings"
"time"

fas "github.com/superfly/fly-autoscaler"
fasprom "github.com/superfly/fly-autoscaler/prometheus"
"github.com/superfly/fly-autoscaler/temporal"
fly "github.com/superfly/fly-go"
"github.com/superfly/fly-go/flaps"
"github.com/superfly/fly-go/tokens"
"gopkg.in/yaml.v3"
Expand All @@ -26,7 +28,13 @@ var (
Commit = ""
)

const (
APIBaseURL = "https://api.fly.io"
)

func main() {
fly.SetBaseURL(APIBaseURL)

if err := run(context.Background(), os.Args[1:]); err == flag.ErrHelp {
os.Exit(2)
} else if err != nil {
Expand Down Expand Up @@ -106,30 +114,38 @@ func registerConfigPathFlag(fs *flag.FlagSet) *string {
}

type Config struct {
AppName string `yaml:"app-name"`
Regions []string `yaml:"regions"`
CreatedMachineN string `yaml:"created-machine-count"`
MinCreatedMachineN string `yaml:"min-created-machine-count"`
MaxCreatedMachineN string `yaml:"max-created-machine-count"`
StartedMachineN string `yaml:"started-machine-count"`
MinStartedMachineN string `yaml:"min-started-machine-count"`
MaxStartedMachineN string `yaml:"max-started-machine-count"`
Interval time.Duration `yaml:"interval"`
APIToken string `yaml:"api-token"`
Verbose bool `yaml:"verbose"`
AppName string `yaml:"app-name"`
Org string `yaml:"org"`
Regions []string `yaml:"regions"`
CreatedMachineN string `yaml:"created-machine-count"`
MinCreatedMachineN string `yaml:"min-created-machine-count"`
MaxCreatedMachineN string `yaml:"max-created-machine-count"`
StartedMachineN string `yaml:"started-machine-count"`
MinStartedMachineN string `yaml:"min-started-machine-count"`
MaxStartedMachineN string `yaml:"max-started-machine-count"`
Concurrency int `yaml:"concurrency"`
Interval time.Duration `yaml:"interval"`
Timeout time.Duration `yaml:"timeout"`
AppListRefreshInterval time.Duration `yaml:"app-list-refresh-interval"`
APIToken string `yaml:"api-token"`
Verbose bool `yaml:"verbose"`

MetricCollectors []*MetricCollectorConfig `yaml:"metric-collectors"`
}

func NewConfig() *Config {
return &Config{
Interval: fas.DefaultReconcilerInterval,
Concurrency: fas.DefaultConcurrency,
Interval: fas.DefaultReconcileInterval,
Timeout: fas.DefaultReconcileTimeout,
AppListRefreshInterval: fas.DefaultAppListRefreshInterval,
}
}

func NewConfigFromEnv() (*Config, error) {
func NewConfigFromEnv() (_ *Config, err error) {
c := NewConfig()
c.AppName = os.Getenv("FAS_APP_NAME")
c.Org = os.Getenv("FAS_ORG")
c.CreatedMachineN = os.Getenv("FAS_CREATED_MACHINE_COUNT")
c.MinCreatedMachineN = os.Getenv("FAS_MIN_CREATED_MACHINE_COUNT")
c.MaxCreatedMachineN = os.Getenv("FAS_MAX_CREATED_MACHINE_COUNT")
Expand All @@ -142,12 +158,26 @@ func NewConfigFromEnv() (*Config, error) {
c.Regions = strings.Split(s, ",")
}

if s := os.Getenv("FAS_CONCURRENCY"); s != "" {
if c.Concurrency, err = strconv.Atoi(s); err != nil {
return nil, fmt.Errorf("cannot parse FAS_CONCURRENCY as integer: %q", s)
}
}

if s := os.Getenv("FAS_INTERVAL"); s != "" {
d, err := time.ParseDuration(s)
if err != nil {
if c.Interval, err = time.ParseDuration(s); err != nil {
return nil, fmt.Errorf("cannot parse FAS_INTERVAL as duration: %q", s)
}
c.Interval = d
}
if s := os.Getenv("FAS_TIMEOUT"); s != "" {
if c.Timeout, err = time.ParseDuration(s); err != nil {
return nil, fmt.Errorf("cannot parse FAS_TIMEOUT as duration: %q", s)
}
}
if s := os.Getenv("FAS_APP_LIST_REFRESH_INTERVAL"); s != "" {
if c.AppListRefreshInterval, err = time.ParseDuration(s); err != nil {
return nil, fmt.Errorf("cannot parse FAS_APP_LIST_REFRESH_INTERVAL as duration: %q", s)
}
}

if addr := os.Getenv("FAS_PROMETHEUS_ADDRESS"); addr != "" {
Expand Down Expand Up @@ -278,15 +308,28 @@ func (c *Config) validateStartedMachineCount() error {
return nil
}

func (c *Config) NewFlapsClient(ctx context.Context) (*flaps.Client, error) {
func (c *Config) NewFlyClient(ctx context.Context) (*fly.Client, error) {
if c.APIToken == "" {
return nil, fmt.Errorf("api token required")
}

return flaps.NewWithOptions(ctx, flaps.NewClientOpts{
AppName: c.AppName,
Tokens: tokens.Parse(c.APIToken),
})
return fly.NewClientFromOptions(fly.ClientOptions{
Tokens: tokens.Parse(c.APIToken),
}), nil
}

func (c *Config) NewFlapsClient() (fas.NewFlapsClientFunc, error) {
if c.APIToken == "" {
return nil, fmt.Errorf("api token required")
}
tok := tokens.Parse(c.APIToken)

return func(ctx context.Context, appName string) (fas.FlapsClient, error) {
return flaps.NewWithOptions(ctx, flaps.NewClientOpts{
AppName: appName,
Tokens: tok,
})
}, nil
}

func (c *Config) NewMetricCollectors() ([]fas.MetricCollector, error) {
Expand Down
92 changes: 57 additions & 35 deletions cmd/fly-autoscaler/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ import (

// ServeCommand represents a command run the autoscaler server process.
type ServeCommand struct {
reconciler *fas.Reconciler
Config *Config
pool *fas.ReconcilerPool
Config *Config
}

func NewServeCommand() *ServeCommand {
return &ServeCommand{}
}

func (c *ServeCommand) Close() (err error) {
if c.reconciler != nil {
c.reconciler.Stop()
if c.pool != nil {
if err := c.pool.Close(); err != nil {
slog.Warn("failed to close reconciler pool", slog.Any("err", err))
}
}
return nil
}
Expand All @@ -43,12 +45,12 @@ func (c *ServeCommand) Run(ctx context.Context, args []string) (err error) {
return err
}

// Instantiate client to scale up machines.
client, err := c.Config.NewFlapsClient(ctx)
// Instantiate clients for access org/apps & for scaling machines.
flyClient, err := c.Config.NewFlyClient(ctx)
if err != nil {
return fmt.Errorf("cannot create flaps client: %w", err)
return fmt.Errorf("cannot create fly client: %w", err)
}
slog.Info("connected to flaps", slog.String("app_name", c.Config.AppName))
slog.Info("connected to fly")

// Instantiate prometheus collector.
collectors, err := c.Config.NewMetricCollectors()
Expand All @@ -57,47 +59,67 @@ func (c *ServeCommand) Run(ctx context.Context, args []string) (err error) {
}
slog.Info("metrics collectors initialized", slog.Int("n", len(collectors)))

// Instantiate and start reconcilation.
r := fas.NewReconciler(client)
r.MinCreatedMachineN = c.Config.GetMinCreatedMachineN()
r.MaxCreatedMachineN = c.Config.GetMaxCreatedMachineN()
r.MinStartedMachineN = c.Config.GetMinStartedMachineN()
r.MaxStartedMachineN = c.Config.GetMaxStartedMachineN()
r.Regions = c.Config.Regions
r.Interval = c.Config.Interval
r.Collectors = collectors
r.RegisterPromMetrics(prometheus.DefaultRegisterer)
c.reconciler = r
minCreatedMachineN := c.Config.GetMinCreatedMachineN()
maxCreatedMachineN := c.Config.GetMaxCreatedMachineN()
minStartedMachineN := c.Config.GetMinStartedMachineN()
maxStartedMachineN := c.Config.GetMaxStartedMachineN()

// Instantiate pool.
p := fas.NewReconcilerPool(flyClient, c.Config.Concurrency)
if p.NewFlapsClient, err = c.Config.NewFlapsClient(); err != nil {
return fmt.Errorf("cannot initialize flaps client constructor: %w", err)
}
p.NewReconciler = func() *fas.Reconciler {
r := fas.NewReconciler()
r.MinCreatedMachineN = minCreatedMachineN
r.MaxCreatedMachineN = maxCreatedMachineN
r.MinStartedMachineN = minStartedMachineN
r.MaxStartedMachineN = maxStartedMachineN
r.Regions = c.Config.Regions
r.Collectors = collectors
return r
}
p.AppName = c.Config.AppName
p.OrganizationSlug = c.Config.Org
p.ReconcileInterval = c.Config.Interval
p.ReconcileTimeout = c.Config.Timeout
p.AppListRefreshInterval = c.Config.AppListRefreshInterval
p.RegisterPromMetrics(prometheus.DefaultRegisterer)
c.pool = p

attrs := []any{
slog.Duration("interval", r.Interval),
slog.Int("collectors", len(r.Collectors)),
slog.String("interval", p.ReconcileInterval.String()),
slog.String("timeout", p.ReconcileTimeout.String()),
slog.String("appListRefreshInterval", p.AppListRefreshInterval.String()),
slog.Int("collectors", len(collectors)),
}

if len(r.Regions) > 0 {
attrs = append(attrs, slog.Any("regions", r.Regions))
if regions := c.Config.Regions; len(regions) > 0 {
attrs = append(attrs, slog.Any("regions", regions))
}

if r.MinCreatedMachineN == r.MaxCreatedMachineN {
attrs = append(attrs, slog.String("created", r.MinCreatedMachineN))
} else if r.MinCreatedMachineN != "" || r.MaxCreatedMachineN != "" {
if minCreatedMachineN == maxCreatedMachineN {
attrs = append(attrs, slog.String("created", minCreatedMachineN))
} else if minCreatedMachineN != "" || maxCreatedMachineN != "" {
attrs = append(attrs, slog.Group("created",
slog.String("min", r.MinCreatedMachineN),
slog.String("max", r.MaxCreatedMachineN),
slog.String("min", minCreatedMachineN),
slog.String("max", maxCreatedMachineN),
))
}

if r.MinStartedMachineN == r.MaxStartedMachineN {
attrs = append(attrs, slog.String("started", r.MinStartedMachineN))
} else if r.MinStartedMachineN != "" || r.MaxStartedMachineN != "" {
if minStartedMachineN == maxStartedMachineN {
attrs = append(attrs, slog.String("started", minStartedMachineN))
} else if minStartedMachineN != "" || maxStartedMachineN != "" {
attrs = append(attrs, slog.Group("started",
slog.String("min", r.MinStartedMachineN),
slog.String("max", r.MaxStartedMachineN),
slog.String("min", minStartedMachineN),
slog.String("max", maxStartedMachineN),
))
}

slog.Info("reconciler initialized, beginning loop", attrs...)
r.Start()
slog.Info("reconciler pool initialized, beginning loop", attrs...)
if err := p.Open(); err != nil {
return fmt.Errorf("cannot initialize reconciler pool: %w", err)
}

go c.serveMetricsServer(ctx)

Expand Down
11 changes: 10 additions & 1 deletion fas.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ var (
ErrExprInf = errors.New("expression returned Inf")
)

var _ FlyClient = (*flaps.Client)(nil)
var _ FlyClient = (*fly.Client)(nil)

type FlyClient interface {
GetOrganizationBySlug(ctx context.Context, slug string) (*fly.Organization, error)
GetAppsForOrganization(ctx context.Context, orgID string) ([]fly.App, error)
}

var _ FlapsClient = (*flaps.Client)(nil)

type FlapsClient interface {
List(ctx context.Context, state string) ([]*fly.Machine, error)
Launch(ctx context.Context, input fly.LaunchMachineInput) (*fly.Machine, error)
Destroy(ctx context.Context, input fly.RemoveMachineInput, nonce string) error
Start(ctx context.Context, id, nonce string) (*fly.MachineStartResponse, error)
Stop(ctx context.Context, in fly.StopMachineInput, nonce string) error
}

type NewFlapsClientFunc func(ctx context.Context, appName string) (FlapsClient, error)
19 changes: 17 additions & 2 deletions metric_collector.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
package fas

import "context"
import (
"context"
"os"
)

// MetricCollector represents a client for collecting metrics from an external source.
type MetricCollector interface {
Name() string
CollectMetric(ctx context.Context) (float64, error)
CollectMetric(ctx context.Context, app string) (float64, error)
}

// ExpandMetricQuery replaces variables in query with their values.
func ExpandMetricQuery(ctx context.Context, query, app string) string {
return os.Expand(query, func(key string) string {
switch key {
case "APP_NAME":
return app
default:
return ""
}
})
}
31 changes: 31 additions & 0 deletions metric_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package fas_test

import (
"context"
"testing"

fas "github.com/superfly/fly-autoscaler"
)

func TestExpandMetricQuery(t *testing.T) {
t.Run("Static", func(t *testing.T) {
result := fas.ExpandMetricQuery(context.Background(), "foo", "my-app")
if got, want := result, `foo`; got != want {
t.Fatalf("got %q, want %q", got, want)
}
})

t.Run("Bare", func(t *testing.T) {
result := fas.ExpandMetricQuery(context.Background(), "foo $APP_NAME bar", "my-app")
if got, want := result, `foo my-app bar`; got != want {
t.Fatalf("got %q, want %q", got, want)
}
})

t.Run("Wrapped", func(t *testing.T) {
result := fas.ExpandMetricQuery(context.Background(), "foo${APP_NAME}bar", "my-app")
if got, want := result, `foomy-appbar`; got != want {
t.Fatalf("got %q, want %q", got, want)
}
})
}
Loading

0 comments on commit 5125fb6

Please sign in to comment.