Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move runner scaling to the provisioner #3529

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 50 additions & 44 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/leases/dbleaser"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
Expand Down Expand Up @@ -120,7 +119,6 @@ func (c *Config) OpenDBAndInstrument() (*sql.DB, error) {
func Start(
ctx context.Context,
config Config,
runnerScaling scaling.RunnerScaling,
cm *cf.Manager[configuration.Configuration],
sm *cf.Manager[configuration.Secrets],
conn *sql.DB,
Expand All @@ -146,7 +144,7 @@ func Start(
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, conn, cm, sm, config, devel, runnerScaling)
svc, err := New(ctx, conn, cm, sm, config, devel)
if err != nil {
return err
}
Expand All @@ -170,9 +168,6 @@ func Start(
rpc.PProf(),
)
})
g.Go(func() error {
return runnerScaling.Start(ctx, *config.Bind, svc.dbleaser)
})

go svc.dal.PollDeployments(ctx)

Expand Down Expand Up @@ -220,7 +215,6 @@ type Service struct {

increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex
runnerScaling scaling.RunnerScaling

clientLock sync.Mutex
}
Expand All @@ -232,7 +226,6 @@ func New(
sm *cf.Manager[configuration.Secrets],
config Config,
devel bool,
runnerScaling scaling.RunnerScaling,
) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
Expand Down Expand Up @@ -264,7 +257,6 @@ func New(
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
config: config,
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

Expand Down Expand Up @@ -506,15 +498,27 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U

logger := s.getDeploymentLogger(ctx, deploymentKey)
logger.Debugf("Update deployment for: %s", deploymentKey)

err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(req.Msg.MinReplicas))
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
if req.Msg.MinReplicas != nil {
err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas))
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}
}
if req.Msg.Endpoint != nil {
err = s.dal.SetDeploymentEndpoint(ctx, deploymentKey, *req.Msg.Endpoint)
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
logger.Errorf(err, "Could not set deployment endpoint: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment endpoint: %w", err)
}
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}

return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
Expand Down Expand Up @@ -1658,38 +1662,40 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
// And we set its replicas to zero
// It may seem a bit odd to do this here but this is where we are actually updating the routing table
// Which is what makes as a deployment 'live' from a clients POV
optURI, err := s.runnerScaling.GetEndpointForDeployment(ctx, v.Module, v.Key.String())
if err != nil {
if v.Schema.Runtime == nil {
deploymentLogger.Debugf("Deployment %s has no runtime metadata", v.Key.String())
continue
}
targetEndpoint, ok := v.Endpoint.Get()
if !ok {
deploymentLogger.Debugf("Failed to get updated endpoint for deployment %s", v.Key.String())
continue
} else if uri, ok := optURI.Get(); ok {
// Check if this is a new route
targetEndpoint := uri.String()
if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() {
// If it is a new route we only add it if we can ping it
// Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready.
_, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err != nil {
deploymentLogger.Tracef("Unable to ping %s, not adding to route table", v.Key.String())
continue
}
deploymentLogger.Infof("Deployed %s", v.Key.String())
status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed)
}
// Check if this is a new route
if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() {
// If it is a new route we only add it if we can ping it
// Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready.
_, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err != nil {
deploymentLogger.Tracef("Unable to ping %s, not adding to route table", v.Key.String())
continue
}
if prev, ok := newRoutes[v.Module]; ok {
// We have already seen a route for this module, the existing route must be an old one
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment)
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
deploymentLogger.Infof("Deployed %s", v.Key.String())
status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed)
}
if prev, ok := newRoutes[v.Module]; ok {
// We have already seen a route for this module, the existing route must be an old one
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment)
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}

orderedModules := maps.Values(modulesByName)
Expand Down
17 changes: 17 additions & 0 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,22 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return nil
}

// SetDeploymentEndpoint sets the deployment endpoint
func (d *DAL) SetDeploymentEndpoint(ctx context.Context, key model.DeploymentKey, endpoint string) (err error) {
// Start the transaction
tx, err := d.Begin(ctx)
if err != nil {
return libdal.TranslatePGError(err)
}
defer tx.CommitOrRollback(ctx, &err)

err = tx.db.SetDeploymentEndpoint(ctx, key, optional.Some(endpoint))
if err != nil {
return libdal.TranslatePGError(err)
}
return nil
}

var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")

// ReplaceDeployment replaces an old deployment of a module with a new deployment.
Expand Down Expand Up @@ -520,6 +536,7 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment,
Replicas: optional.Some(int(in.Replicas)),
Schema: in.Deployment.Schema,
CreatedAt: in.Deployment.CreatedAt,
Endpoint: in.Deployment.Endpoint,
}
}), nil
}
Expand Down
1 change: 1 addition & 0 deletions backend/controller/dal/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ FROM runners r
ORDER BY r.key;

-- name: GetActiveDeployments :many
SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas
SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas, d.endpoint
FROM deployments d
JOIN modules m ON d.module_id = m.id
LEFT JOIN runners r ON d.id = r.deployment_id
Expand Down Expand Up @@ -138,6 +138,12 @@ SET min_replicas = $2, last_activated_at = CASE WHEN min_replicas = 0 THEN (NOW(
WHERE key = sqlc.arg('key')::deployment_key
RETURNING 1;

-- name: SetDeploymentEndpoint :exec
UPDATE deployments
SET endpoint = $2
WHERE key = sqlc.arg('key')::deployment_key
RETURNING 1;

-- name: GetExistingDeploymentForModule :one
SELECT *
FROM deployments d
Expand Down
32 changes: 26 additions & 6 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/dal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Deployment struct {
Schema *schema.Module
CreatedAt time.Time
Labels model.Labels
Endpoint optional.Option[string]
}

func (d Deployment) String() string { return d.Key.String() }
Expand Down
1 change: 1 addition & 0 deletions backend/controller/leases/testdata/go/leases/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ require (
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/leases/testdata/go/leases/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading