diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 563750bed4..d72261ea69 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/scaling" @@ -49,10 +50,11 @@ import ( // CommonConfig between the production controller and development server. type CommonConfig struct { - AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"` - NoConsole bool `help:"Disable the console."` - IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"` - WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"` + AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"` + NoConsole bool `help:"Disable the console."` + IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"` + WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"` + CronJobTimeout time.Duration `help:"Timeout for cron jobs." default:"5m"` } type Config struct { @@ -138,12 +140,20 @@ type clients struct { runner ftlv1connect.RunnerServiceClient } +// ControllerListListener is regularly notified of the current list of controllers +// This is often used to update a hash ring to distribute work. +type ControllerListListener interface { + UpdatedControllerList(ctx context.Context, controllers []dal.Controller) +} + type Service struct { dal *dal.DAL key model.ControllerKey deploymentLogsSink *deploymentLogsSink - tasks *scheduledtask.Scheduler + tasks *scheduledtask.Scheduler + cronJobs *cronjobs.Service + controllerListListeners []ControllerListListener // Map from endpoint to client. clients *ttlcache.Cache[string, clients] @@ -163,7 +173,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. } config.SetDefaults() svc := &Service{ - tasks: scheduledtask.New(ctx, key, db), + tasks: scheduledtask.New(ctx, key), dal: db, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), @@ -174,8 +184,13 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. increaseReplicaFailures: map[string]int{}, } + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) + svc.cronJobs = cronSvc + svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, svc.updateControllersList) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments) @@ -422,6 +437,9 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re return nil, fmt.Errorf("could not replace deployment: %w", err) } } + + s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey) + return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil } @@ -732,11 +750,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl } ingressRoutes := extractIngressRoutingEntries(req.Msg) - dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil) + cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema) + if err != nil { + logger.Errorf(err, "Could not generate cron jobs for new deployment") + return nil, fmt.Errorf("could not generate cron jobs for new deployment: %w", err) + } + + dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs) if err != nil { logger.Errorf(err, "Could not create deployment") return nil, fmt.Errorf("could not create deployment: %w", err) } + deploymentLogger := s.getDeploymentLogger(ctx, dkey) deploymentLogger.Debugf("Created deployment %s", dkey) return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil @@ -999,7 +1024,17 @@ func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error return 0, fmt.Errorf("failed to heartbeat controller: %w", err) } return time.Second * 3, nil +} +func (s *Service) updateControllersList(ctx context.Context) (time.Duration, error) { + controllers, err := s.dal.GetControllers(ctx, false) + if err != nil { + return 0, err + } + for _, listener := range s.controllerListListeners { + listener.UpdatedControllerList(ctx, controllers) + } + return time.Second * 5, nil } func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error { diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go new file mode 100644 index 0000000000..e2d4ed70e3 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs.go @@ -0,0 +1,479 @@ +package cronjobs + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/atomic" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" + "github.com/serialx/hashring" +) + +const ( + controllersPerJob = 2 + jobResetInterval = time.Minute + newJobHashRingOverrideInterval = time.Minute + time.Second*20 +) + +type Config struct { + Timeout time.Duration +} + +//sumtype:decl +type event interface { + // cronJobEvent is a marker to ensure that all events implement the interface. + cronJobEvent() +} + +type syncEvent struct { + jobs []model.CronJob + addedDeploymentKey optional.Option[model.DeploymentKey] +} + +func (syncEvent) cronJobEvent() {} + +type endedJobsEvent struct { + jobs []model.CronJob +} + +func (endedJobsEvent) cronJobEvent() {} + +type updatedHashRingEvent struct{} + +func (updatedHashRingEvent) cronJobEvent() {} + +type hashRingState struct { + hashRing *hashring.HashRing + controllers []dal.Controller + idx int +} + +type DAL interface { + GetCronJobs(ctx context.Context) ([]model.CronJob, error) + StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) + EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) + GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) +} + +type Scheduler interface { + Singleton(retry backoff.Backoff, job scheduledtask.Job) + Parallel(retry backoff.Backoff, job scheduledtask.Job) +} + +type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) + +type Service struct { + config Config + key model.ControllerKey + requestSource string + + dal DAL + scheduler Scheduler + call ExecuteCallFunc + + clock clock.Clock + events *pubsub.Topic[event] + + hashRingState atomic.Value[*hashRingState] +} + +func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service { + return NewForTesting(ctx, key, requestSource, config, dal, scheduler, call, clock.New()) +} + +func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { + svc := &Service{ + config: config, + key: key, + requestSource: requestSource, + dal: dal, + scheduler: scheduler, + call: call, + clock: clock, + events: pubsub.New[event](), + } + svc.UpdatedControllerList(ctx, nil) + + svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.syncJobs) + svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.killOldJobs) + + go svc.watchForUpdates(ctx) + + return svc +} + +func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]model.CronJob, error) { + start := s.clock.Now().UTC() + newJobs := []model.CronJob{} + merr := []error{} + for _, decl := range module.Decls { + verb, ok := decl.Value.(*schemapb.Decl_Verb) + if !ok { + continue + } + for _, metadata := range verb.Verb.Metadata { + cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob) + if !ok { + continue + } + cronStr := cronMetadata.CronJob.Cron + schedule, err := cron.Parse(cronStr) + if err != nil { + merr = append(merr, fmt.Errorf("failed to parse cron schedule %q: %w", cronStr, err)) + continue + } + next, err := cron.NextAfter(schedule, start, false) + if err != nil { + merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Verb.Name, schedule, err)) + continue + } + newJobs = append(newJobs, model.CronJob{ + Key: model.NewCronJobKey(module.Name, verb.Verb.Name), + Verb: model.VerbRef{Module: module.Name, Name: verb.Verb.Name}, + Schedule: cronStr, + StartTime: start, + NextExecution: next, + State: model.CronJobStateIdle, + // DeploymentKey: Filled in by DAL + }) + } + } + if len(merr) > 0 { + return nil, errors.Join(merr...) + } + return newJobs, nil +} + +// CreatedOrReplacedDeloyment is only called by the responsible controller to its cron service, and will not be received by the other cron services. +// When a controller creates/replaces a deployment, its cron job service is responsible for +// the newly created cron jobs until other controllers have a chance to resync their list of jobs and start sharing responsibility of the new cron jobs. +func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { + // Rather than finding old/new cron jobs and updating our state, we can just resync the list of jobs + _ = s.syncJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) +} + +// syncJobs is run periodically via a scheduled task +func (s *Service) syncJobs(ctx context.Context) (time.Duration, error) { + err := s.syncJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) + if err != nil { + return 0, err + } + return jobResetInterval, nil +} + +// syncJobsWithNewDeploymentKey resyncs the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time. +func (s *Service) syncJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error { + logger := log.FromContext(ctx) + + jobs, err := s.dal.GetCronJobs(ctx) + if err != nil { + logger.Errorf(err, "failed to get cron jobs") + return fmt.Errorf("failed to get cron jobs: %w", err) + } + s.events.Publish(syncEvent{ + jobs: jobs, + addedDeploymentKey: deploymentKey, + }) + return nil +} + +func (s *Service) executeJob(ctx context.Context, job model.CronJob) { + logger := log.FromContext(ctx) + requestBody := map[string]any{} + requestJSON, err := json.Marshal(requestBody) + if err != nil { + logger.Errorf(err, "could not build body for cron job: %v", job.Key) + return + } + + req := connect.NewRequest(&ftlv1.CallRequest{ + Verb: &schemapb.Ref{Module: job.Verb.Module, Name: job.Verb.Name}, + Body: requestJSON, + }) + + requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Verb.Module, job.Verb.Name)) + + callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + _, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource) + if err != nil { + logger.Errorf(err, "failed to execute cron job %v", job.Key) + // Do not return, continue to end the job and schedule the next execution + } + + schedule, err := cron.Parse(job.Schedule) + if err != nil { + logger.Errorf(err, "failed to parse cron schedule %q", job.Schedule) + return + } + next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) + if err != nil { + logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule) + return + } + + updatedJob, err := s.dal.EndCronJob(ctx, job, next) + if err != nil { + logger.Errorf(err, "failed to end cron job %v", job.Key) + } else { + s.events.Publish(endedJobsEvent{ + jobs: []model.CronJob{updatedJob}, + }) + } +} + +// killOldJobs looks for jobs that have been executing for too long. +// A soft timeout should normally occur from the job's context timing out, but there are cases where this does not happen (eg: unresponsive or dead controller) +// In these cases we need a hard timout after an additional grace period. +// To do this, this function resets these job's state to idle and updates the next execution time in the db so the job can be picked up again next time. +func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { + logger := log.FromContext(ctx) + staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) + if err != nil { + return 0, err + } else if len(staleJobs) == 0 { + return time.Minute, nil + } + + updatedJobs := []model.CronJob{} + for _, stale := range staleJobs { + start := s.clock.Now().UTC() + pattern, err := cron.Parse(stale.Schedule) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %q because schedule could not be parsed: %q", stale.Key, stale.Schedule) + continue + } + next, err := cron.NextAfter(pattern, start, false) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %q because next date could not be calculated: %q", stale.Key, stale.Schedule) + continue + } + + updated, err := s.dal.EndCronJob(ctx, stale, next) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Key, err) + continue + } + logger.Warnf("Killed stale cron job %s", stale.Key) + updatedJobs = append(updatedJobs, updated) + } + + s.events.Publish(endedJobsEvent{ + jobs: updatedJobs, + }) + + return time.Minute, nil +} + +// watchForUpdates is the centralized place that handles: +// - the list of known jobs and their state +// - executing jobs when they are due +// - reacting to events that change the list of jobs, deployments or hash ring +// +// State is private to this function to ensure thread safety. +func (s *Service) watchForUpdates(ctx context.Context) { + logger := log.FromContext(ctx).Scope("cron") + + events := make(chan event, 128) + s.events.Subscribe(events) + defer s.events.Unsubscribe(events) + + state := &state{ + executing: map[string]bool{}, + newJobs: map[string]time.Time{}, + blockedUntil: s.clock.Now(), + } + + for { + now := s.clock.Now() + next := now.Add(time.Hour) // should never be reached, expect a different signal long beforehand + for _, j := range state.jobs { + if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil && possibleNext.Before(next) { + next = possibleNext + } + } + + if next.Before(state.blockedUntil) { + next = state.blockedUntil + logger.Tracef("loop blocked for %v", next.Sub(now)) + } else if next.Sub(now) < time.Second { + next = now.Add(time.Second) + logger.Tracef("loop while gated for 1s") + } else if next.Sub(now) > time.Minute*59 { + logger.Tracef("loop while idling") + } else { + logger.Tracef("loop with next %v, %d jobs", next.Sub(now), len(state.jobs)) + } + + select { + case <-ctx.Done(): + return + case <-s.clock.After(next.Sub(now)): + // Try starting jobs in db + jobsToAttempt := slices.Filter(state.jobs, func(j model.CronJob) bool { + if n, err := s.nextAttemptForJob(j, state, true); err == nil { + return !n.After(s.clock.Now().UTC()) + } + return false + }) + jobResults, err := s.dal.StartCronJobs(ctx, jobsToAttempt) + if err != nil { + logger.Errorf(err, "failed to start cron jobs in db") + state.blockedUntil = s.clock.Now().Add(time.Second * 5) + continue + } + + // Start jobs that were successfully updated + updatedJobs := []model.CronJob{} + removedDeploymentKeys := map[string]model.DeploymentKey{} + + for _, job := range jobResults { + updatedJobs = append(updatedJobs, job.CronJob) + if !job.DidStartExecution { + continue + } + if !job.HasMinReplicas { + // We successfully updated the db to start this job but the deployment has min replicas set to 0 + // We need to update the db to end this job + removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey + _, err := s.dal.EndCronJob(ctx, job.CronJob, next) + if err != nil { + logger.Errorf(err, "failed to end cron job %s", job.Key.String()) + } + continue + } + logger.Infof("executing job %v", job.Key) + state.startedExecutingJob(job.CronJob) + go s.executeJob(ctx, job.CronJob) + } + + // Update job list + state.updateJobs(updatedJobs) + for _, key := range removedDeploymentKeys { + state.removeDeploymentKey(key) + } + case e := <-events: + switch event := e.(type) { + case syncEvent: + logger.Tracef("syncing job list: %d jobs", len(event.jobs)) + state.sync(event.jobs, event.addedDeploymentKey) + case endedJobsEvent: + logger.Tracef("updating %d jobs", len(event.jobs)) + state.updateJobs(event.jobs) + case updatedHashRingEvent: + // do another cycle through the loop to see if new jobs need to be scheduled + } + } + } +} + +func (s *Service) nextAttemptForJob(job model.CronJob, state *state, allowsNow bool) (time.Time, error) { + if !s.isResponsibleForJob(job, state) { + return s.clock.Now(), fmt.Errorf("controller is not responsible for job") + } + if job.State == model.CronJobStateExecuting { + if state.isExecutingInCurrentController(job) { + // no need to schedule this job until it finishes + return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") + } + // We don't know when the other controller that is executing this job will finish it + // So we should optimistically attempt it when the next execution date is due assuming the job finishes + pattern, err := cron.Parse(job.Schedule) + if err != nil { + return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule) + } + next, err := cron.NextAfter(pattern, s.clock.Now().UTC(), allowsNow) + if err == nil { + return next, nil + } + } + return job.NextExecution, nil +} + +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { + logger := log.FromContext(ctx).Scope("cron") + controllerIdx := -1 + for idx, controller := range controllers { + if controller.Key.String() == s.key.String() { + controllerIdx = idx + break + } + } + if controllerIdx == -1 { + logger.Tracef("controller %q not found in list of controllers", s.key) + } + + oldState := s.hashRingState.Load() + if oldState != nil && len(oldState.controllers) == len(controllers) { + hasChanged := false + for idx, new := range controllers { + old := oldState.controllers[idx] + if new.Key.String() != old.Key.String() { + hasChanged = true + break + } + } + if !hasChanged { + return + } + } + + hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) + s.hashRingState.Store(&hashRingState{ + hashRing: hashRing, + controllers: controllers, + idx: controllerIdx, + }) + + s.events.Publish(updatedHashRingEvent{}) +} + +// isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, +// or if enough other controllers will handle it. This allows us to spread the job load across controllers. +func (s *Service) isResponsibleForJob(job model.CronJob, state *state) bool { + if state.isJobTooNewForHashRing(job) { + return true + } + hashringState := s.hashRingState.Load() + if hashringState == nil { + return true + } + + initialKey, ok := hashringState.hashRing.GetNode(job.Key.String()) + if !ok { + return true + } + + initialIdx := -1 + for idx, controller := range hashringState.controllers { + if controller.Key.String() == initialKey { + initialIdx = idx + break + } + } + if initialIdx == -1 { + return true + } + + if initialIdx+controllersPerJob > len(hashringState.controllers) { + // wraps around + return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerJob)-len(hashringState.controllers) + } + return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerJob +} diff --git a/backend/controller/cronjobs/cronjobs_integration_test.go b/backend/controller/cronjobs/cronjobs_integration_test.go new file mode 100644 index 0000000000..cb07d02fd4 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_integration_test.go @@ -0,0 +1,130 @@ +//go:build integration + +package cronjobs + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" +) + +type mockScheduler struct { +} + +func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +type controller struct { + key model.ControllerKey + DAL DAL + clock *clock.Mock + cronJobs *Service +} + +func TestService(t *testing.T) { + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + conn := sqltest.OpenForTesting(ctx, t) + dal, err := New(ctx, conn) + assert.NoError(t, err) + + config := Config{Timeout: time.Minute * 5} + clock := clock.NewMock() + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + jobsToCreate := []model.CronJob{} + for i := range 20 { + now := clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + jobsToCreate = append(jobsToCreate, model.CronJob{ + Key: model.NewCronJobKey("initial", fmt.Sprintf("verb%d", i)), + Verb model.VerbRef{Module: "initial", Name: fmt.Sprintf("verb%d", i)}, + Schedule pattern.String(), + StartTime now(), + NextExecution next, + State CronJobStateIdle, + }) + } + + dal.CreateDeployment(ctx, "go", &schema.Module{ + Name: "initial", + }, artefacts []DeploymentArtefact{}, []IngressRoutingEntry{}, jobsToCreate) (key model.DeploymentKey, err error) + + controllers := []*controller{} + for i := range 5 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + controller := &controller{ + key: key, + DAL: dal, + clock: clock, + cronJobs: NewForTesting(ctx, key, "test.com", config, dal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.syncJobs(ctx) + }() + } + + clock.Add(time.Second * 5) + time.Sleep(time.Millisecond * 100) + for range 3 { + clock.Add(time.Second * 10) + time.Sleep(time.Millisecond * 100) + } + + for _, j := range jobsToCreate { + count := verbCallCount[j.Verb.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) + } +} diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go new file mode 100644 index 0000000000..04e045099b --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -0,0 +1,333 @@ +package cronjobs + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" + xslices "golang.org/x/exp/slices" +) + +type mockDAL struct { + lock sync.Mutex + clock *clock.Mock + jobs []model.CronJob + attemptCountMap map[string]int +} + +func (d *mockDAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return d.jobs, nil +} + +func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) { + d.lock.Lock() + defer d.lock.Unlock() + + job := model.CronJob{ + Key: model.NewCronJobKey(module, verb), + DeploymentKey: deploymentKey, + Verb: model.VerbRef{Module: module, Name: verb}, + Schedule: schedule, + StartTime: startTime, + NextExecution: nextExecution, + State: model.CronJobStateIdle, + } + d.jobs = append(d.jobs, job) +} + +func (d *mockDAL) indexForJob(job model.CronJob) (int, error) { + for i, j := range d.jobs { + if j.Key.String() == job.Key.String() { + return i, nil + } + } + return -1, fmt.Errorf("job not found") +} + +func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) { + d.lock.Lock() + defer d.lock.Unlock() + + attemptedJobs = []dal.AttemptedCronJob{} + now := (*d.clock).Now() + + for _, inputJob := range jobs { + i, err := d.indexForJob(inputJob) + if err != nil { + return nil, err + } + job := d.jobs[i] + if !job.NextExecution.After(now) && job.State == model.CronJobStateIdle { + job.State = model.CronJobStateExecuting + job.StartTime = (*d.clock).Now() + d.jobs[i] = job + attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ + CronJob: job, + DidStartExecution: true, + HasMinReplicas: true, + }) + } else { + attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ + CronJob: job, + DidStartExecution: false, + HasMinReplicas: true, + }) + } + d.attemptCountMap[job.Key.String()]++ + } + return attemptedJobs, nil +} + +func (d *mockDAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + i, err := d.indexForJob(job) + if err != nil { + return model.CronJob{}, err + } + internalJob := d.jobs[i] + if internalJob.State != model.CronJobStateExecuting { + return model.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") + } + if internalJob.StartTime != job.StartTime { + return model.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") + } + internalJob.State = model.CronJobStateIdle + internalJob.NextExecution = next + d.jobs[i] = internalJob + return internalJob, nil +} + +func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return slices.Filter(d.jobs, func(job model.CronJob) bool { + return (*d.clock).Now().After(job.StartTime.Add(duration)) + }), nil +} + +type mockScheduler struct { +} + +func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +type controller struct { + key model.ControllerKey + DAL DAL + clock *clock.Mock + cronJobs *Service +} + +func TestService(t *testing.T) { + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + config := Config{Timeout: time.Minute * 5} + clock := clock.NewMock() + mockDal := &mockDAL{ + clock: clock, + lock: sync.Mutex{}, + attemptCountMap: map[string]int{}, + } + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + for i := range 20 { + deploymentKey := model.NewDeploymentKey("initial") + now := clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) + } + + controllers := []*controller{} + for i := range 5 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + controller := &controller{ + key: key, + DAL: mockDal, + clock: clock, + cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.syncJobs(ctx) + }() + } + + clock.Add(time.Second * 5) + time.Sleep(time.Millisecond * 100) + for range 3 { + clock.Add(time.Second * 10) + time.Sleep(time.Millisecond * 100) + } + + for _, j := range mockDal.jobs { + count := verbCallCount[j.Verb.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) + } +} + +func TestHashRing(t *testing.T) { + // This test uses multiple mock clocks to progress time for each controller individually + // This allows us to compare attempts for each cron job and know which controller attempted it + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + config := Config{Timeout: time.Minute * 5} + mockDal := &mockDAL{ + clock: clock.NewMock(), + lock: sync.Mutex{}, + attemptCountMap: map[string]int{}, + } + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + for i := range 100 { + deploymentKey := model.NewDeploymentKey("initial") + now := mockDal.clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) + } + + controllers := []*controller{} + for i := range 20 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + clock := clock.NewMock() + controller := &controller{ + key: key, + DAL: mockDal, + clock: clock, + cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.syncJobs(ctx) + }() + } + time.Sleep(time.Millisecond * 100) + + // progress time for each controller one at a time, noting which verbs got attempted each time + // to build a map of verb to controller keys + controllersForVerbs := map[string][]model.ControllerKey{} + for _, c := range controllers { + beforeAttemptCount := map[string]int{} + for k, v := range mockDal.attemptCountMap { + beforeAttemptCount[k] = v + } + + c.clock.Add(time.Second * 15) + time.Sleep(time.Millisecond * 100) + + for k, v := range mockDal.attemptCountMap { + if beforeAttemptCount[k] == v { + continue + } + controllersForVerbs[k] = append(controllersForVerbs[k], c.key) + } + } + + // Check if each job has the same key list + // Theoretically this is is possible for all jobs to have the same assigned controllers, but with 100 jobs and 20 controllers, this is unlikely + keys := []string{} + hasFoundNonMatchingKeys := false + for v, k := range controllersForVerbs { + assert.Equal(t, len(k), 2, "expected verb %s to be attempted by 2 controllers", v) + + kStrs := slices.Map(k, func(k model.ControllerKey) string { return k.String() }) + xslices.Sort(kStrs) + if len(keys) == 0 { + keys = kStrs + continue + } + + if hasFoundNonMatchingKeys == false { + for keyIdx, keyStr := range kStrs { + if keys[keyIdx] != keyStr { + hasFoundNonMatchingKeys = true + } + } + } + } + assert.True(t, hasFoundNonMatchingKeys, "expected at least one verb to have different controllers assigned") +} diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go new file mode 100644 index 0000000000..0620efe753 --- /dev/null +++ b/backend/controller/cronjobs/state.go @@ -0,0 +1,82 @@ +package cronjobs + +import ( + "time" + + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/types/optional" +) + +// state models the state of the cron job service's private state for scheduling jobs and reacting to events +type state struct { + jobs []model.CronJob + + // Used to determine if this controller is currently executing a job + executing map[string]bool + + // Newly created jobs should be attempted by the controller that created them until other controllers + // have a chance to resync their job lists and share responsibilities through the hash ring + newJobs map[string]time.Time + + // We delay any job attempts in case of db errors to avoid hammering the db in a tight loop + blockedUntil time.Time +} + +func (s *state) isExecutingInCurrentController(job model.CronJob) bool { + return s.executing[job.Key.String()] +} + +func (s *state) startedExecutingJob(job model.CronJob) { + s.executing[job.Key.String()] = true +} + +func (s *state) isJobTooNewForHashRing(job model.CronJob) bool { + if t, ok := s.newJobs[job.Key.String()]; ok { + if time.Since(t) < newJobHashRingOverrideInterval { + return true + } + delete(s.newJobs, job.Key.String()) + } + return false +} + +func (s *state) sync(jobs []model.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { + s.jobs = make([]model.CronJob, len(jobs)) + copy(s.jobs, jobs) + for _, job := range s.jobs { + if job.State != model.CronJobStateExecuting { + delete(s.executing, job.Key.String()) + } + if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() { + // This job is new and should be attempted by the current controller + s.newJobs[job.Key.String()] = time.Now() + } + } +} + +func (s *state) updateJobs(jobs []model.CronJob) { + updatedJobMap := jobMap(jobs) + for idx, old := range s.jobs { + if updated, exists := updatedJobMap[old.Key.String()]; exists { + s.jobs[idx] = updated + if updated.State != model.CronJobStateExecuting { + delete(s.executing, updated.Key.String()) + } + } + } +} + +func (s *state) removeDeploymentKey(key model.DeploymentKey) { + s.jobs = slices.Filter(s.jobs, func(j model.CronJob) bool { + return j.DeploymentKey.String() != key.String() + }) +} + +func jobMap(jobs []model.CronJob) map[string]model.CronJob { + m := map[string]model.CronJob{} + for _, job := range jobs { + m[job.Key.String()] = job + } + return m +} diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 7d611de642..b1586f3aec 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -411,7 +411,7 @@ type IngressRoutingEntry struct { // previously created artefacts with it. // // If an existing deployment with identical artefacts exists, it is returned. -func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []CronJob) (key model.DeploymentKey, err error) { +func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { logger := log.FromContext(ctx) // Start the transaction @@ -496,8 +496,8 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{ Key: job.Key, DeploymentKey: deploymentKey, - ModuleName: job.Ref.Module, - Verb: job.Ref.Name, + ModuleName: job.Verb.Module, + Verb: job.Verb.Name, StartTime: job.StartTime, Schedule: job.Schedule, NextExecution: job.NextExecution, @@ -916,43 +916,20 @@ func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error) { return count, translatePGError(err) } -type JobState string - -const ( - JobStateIdle = JobState(sql.CronJobStateIdle) - JobStateExecuting = JobState(sql.CronJobStateExecuting) -) - -type CronJob struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Ref schema.Ref - Schedule string - StartTime time.Time - NextExecution time.Time - State JobState -} - -type AttemptedCronJob struct { - DidStartExecution bool - HasMinReplicas bool - CronJob -} - -func cronJobFromRow(row sql.GetCronJobsRow) CronJob { - return CronJob{ +func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob { + return model.CronJob{ Key: row.Key, DeploymentKey: row.DeploymentKey, - Ref: schema.Ref{Module: row.Module, Name: row.Verb}, + Verb: model.VerbRef{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, - State: JobState(row.State), + State: row.State, } } // GetCronJobs returns all cron jobs for deployments with min replicas > 0 -func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) { +func (d *DAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { rows, err := d.db.GetCronJobs(ctx) if err != nil { return nil, translatePGError(err) @@ -960,12 +937,18 @@ func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) { return slices.Map(rows, cronJobFromRow), nil } +type AttemptedCronJob struct { + DidStartExecution bool + HasMinReplicas bool + model.CronJob +} + // StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row -func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs []AttemptedCronJob, err error) { +func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []AttemptedCronJob, err error) { if len(jobs) == 0 { return nil, nil } - rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() })) + rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job model.CronJob) string { return job.Key.String() })) if err != nil { return nil, translatePGError(err) } @@ -973,14 +956,14 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs attemptedJobs = []AttemptedCronJob{} for _, row := range rows { job := AttemptedCronJob{ - CronJob: CronJob{ + CronJob: model.CronJob{ Key: row.Key, DeploymentKey: row.DeploymentKey, - Ref: schema.Ref{Module: row.Module, Name: row.Verb}, + Verb: model.VerbRef{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, - State: JobState(row.State), + State: row.State, }, DidStartExecution: row.Updated, HasMinReplicas: row.HasMinReplicas, @@ -992,21 +975,21 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs // EndCronJob sets the status from executing to idle and updates the next execution time // Can be called on the successful completion of a job, or if the job failed to execute (error or timeout) -func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) { +func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime) if err != nil { - return CronJob{}, translatePGError(err) + return model.CronJob{}, translatePGError(err) } return cronJobFromRow(sql.GetCronJobsRow(row)), nil } // GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration -func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]CronJob, error) { +func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { rows, err := d.db.GetStaleCronJobs(ctx, duration) if err != nil { return nil, translatePGError(err) } - return slices.Map(rows, func(row sql.GetStaleCronJobsRow) CronJob { + return slices.Map(rows, func(row sql.GetStaleCronJobsRow) model.CronJob { return cronJobFromRow(sql.GetCronJobsRow(row)) }), nil } diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go index e770197ba4..e3f3a049be 100644 --- a/backend/controller/scheduledtask/scheduledtask.go +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -35,16 +35,8 @@ type descriptor struct { // run. type Job func(ctx context.Context) (time.Duration, error) -type DAL interface { - GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) -} - type DALFunc func(ctx context.Context, all bool) ([]dal.Controller, error) -func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) { - return f(ctx, all) -} - // Scheduler is a task scheduler for the controller. // // Each job runs in its own goroutine. @@ -54,28 +46,25 @@ func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller // as the hash ring is only updated periodically and controllers may have // inconsistent views of the hash ring. type Scheduler struct { - controller DAL - key model.ControllerKey - jobs chan *descriptor - clock clock.Clock + key model.ControllerKey + jobs chan *descriptor + clock clock.Clock hashring atomic.Value[*hashring.HashRing] } // New creates a new [Scheduler]. -func New(ctx context.Context, id model.ControllerKey, controller DAL) *Scheduler { - return NewForTesting(ctx, id, controller, clock.New()) +func New(ctx context.Context, id model.ControllerKey) *Scheduler { + return NewForTesting(ctx, id, clock.New()) } -func NewForTesting(ctx context.Context, id model.ControllerKey, controller DAL, clock clock.Clock) *Scheduler { +func NewForTesting(ctx context.Context, id model.ControllerKey, clock clock.Clock) *Scheduler { s := &Scheduler{ - controller: controller, - key: id, - jobs: make(chan *descriptor), - clock: clock, + key: id, + jobs: make(chan *descriptor), + clock: clock, } - _ = s.updateHashring(ctx) - go s.syncHashRing(ctx) + s.UpdatedControllerList(ctx, nil) go s.run(ctx) return s } @@ -107,7 +96,7 @@ func (s *Scheduler) schedule(retry backoff.Backoff, job Job, singlyHomed bool) { } func (s *Scheduler) run(ctx context.Context) { - logger := log.FromContext(ctx).Scope("cron") + logger := log.FromContext(ctx).Scope("scheduler") // List of jobs to run. // For singleton jobs running on a different host, this can include jobs // scheduled in the past. These are skipped on each run. @@ -147,7 +136,7 @@ func (s *Scheduler) run(ctx context.Context) { } } jobs[i] = nil // Zero out scheduled jobs. - logger.Scope(job.name).Tracef("Running cron job") + logger.Scope(job.name).Tracef("Running scheduled task") go func() { if delay, err := job.job(ctx); err != nil { logger.Scope(job.name).Warnf("%s", err) @@ -168,28 +157,8 @@ func (s *Scheduler) run(ctx context.Context) { } } -// Synchronise the hash ring with the active controllers. -func (s *Scheduler) syncHashRing(ctx context.Context) { - logger := log.FromContext(ctx).Scope("cron") - for { - select { - case <-ctx.Done(): - return - - case <-s.clock.After(time.Second * 5): - if err := s.updateHashring(ctx); err != nil { - logger.Warnf("Failed to get controllers: %s", err) - } - } - } -} - -func (s *Scheduler) updateHashring(ctx context.Context) error { - controllers, err := s.controller.GetControllers(ctx, false) - if err != nil { - return err - } +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (s *Scheduler) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { hashring := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) s.hashring.Store(hashring) - return nil } diff --git a/backend/controller/scheduledtask/scheduledtask_test.go b/backend/controller/scheduledtask/scheduledtask_test.go index 6f759a68fb..a9c36a953e 100644 --- a/backend/controller/scheduledtask/scheduledtask_test.go +++ b/backend/controller/scheduledtask/scheduledtask_test.go @@ -40,9 +40,7 @@ func TestCron(t *testing.T) { clock := clock.NewMock() for _, c := range controllers { - c.cron = NewForTesting(ctx, c.controller.Key, DALFunc(func(ctx context.Context, all bool) ([]dal.Controller, error) { - return slices.Map(controllers, func(c *controller) dal.Controller { return c.controller }), nil - }), clock) + c.cron = NewForTesting(ctx, c.controller.Key, clock) c.cron.Singleton(backoff.Backoff{}, func(ctx context.Context) (time.Duration, error) { singletonCount.Add(1) return time.Second, nil @@ -51,6 +49,11 @@ func TestCron(t *testing.T) { multiCount.Add(1) return time.Second, nil }) + c.cron.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.controller.Key, + } + })) } clock.Add(time.Second * 6) diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 6626f34cf9..5687d9abb1 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -254,7 +254,7 @@ type CronJob struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState ModuleName string } diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 85256f2dd0..eac52ac819 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -17,7 +17,6 @@ type Querier interface { // Create a new artefact and return the artefact ID. CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error - CreateCronRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index eb7e0c2421..9d728deedd 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -409,10 +409,6 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment INSERT INTO requests (origin, "key", source_addr) VALUES ($1, $2, $3); --- name: CreateCronRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3); - -- name: UpsertController :one INSERT INTO controller (key, endpoint) VALUES ($1, $2) diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 4bbce650bf..aad9cafe90 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -86,16 +86,6 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } -const createCronRequest = `-- name: CreateCronRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3) -` - -func (q *Queries) CreateCronRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error { - _, err := q.db.Exec(ctx, createCronRequest, origin, key, sourceAddr) - return err -} - const createDeployment = `-- name: CreateDeployment :exec INSERT INTO deployments (module_id, "schema", "key") VALUES ((SELECT id FROM modules WHERE name = $1::TEXT LIMIT 1), $2::BYTEA, $3::deployment_key) @@ -182,7 +172,7 @@ type EndCronJobRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState } func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { @@ -490,7 +480,7 @@ type GetCronJobsRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState } func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { @@ -1178,7 +1168,7 @@ type GetStaleCronJobsRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState } func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) { @@ -1543,7 +1533,7 @@ type StartCronJobsRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState HasMinReplicas bool Updated bool } diff --git a/backend/schema/validate.go b/backend/schema/validate.go index 61cabfddd1..45af0cecae 100644 --- a/backend/schema/validate.go +++ b/backend/schema/validate.go @@ -480,6 +480,12 @@ func validateVerbMetadata(scopes Scopes, n *Verb) (merr []error) { if err != nil { merr = append(merr, err) } + if _, ok := n.Request.(*Unit); !ok { + merr = append(merr, errorf(md, "verb %s: cron job can not have a request type", n.Name)) + } + if _, ok := n.Response.(*Unit); !ok { + merr = append(merr, errorf(md, "verb %s: cron job can not have a response type", n.Name)) + } case *MetadataCalls, *MetadataDatabases, *MetadataAlias: } } diff --git a/backend/schema/validate_test.go b/backend/schema/validate_test.go index 92d8cd8377..a159bf5209 100644 --- a/backend/schema/validate_test.go +++ b/backend/schema/validate_test.go @@ -171,6 +171,21 @@ func TestValidate(t *testing.T) { "6:10-10: verb can not have multiple instances of ingress", }, }, + + {name: "CronOnNonEmptyVerb", + schema: ` + module one { + verb verbWithWrongInput(Empty) Unit + +cron * * * * * * * + verb verbWithWrongOutput(Unit) Empty + +cron * * * * * * * + } + `, + errs: []string{ + "4:7-7: verb verbWithWrongInput: cron job can not have a request type", + "6:7-7: verb verbWithWrongOutput: cron job can not have a response type", + }, + }, } for _, test := range tests { diff --git a/internal/model/cron_job.go b/internal/model/cron_job.go new file mode 100644 index 0000000000..42c5bf8347 --- /dev/null +++ b/internal/model/cron_job.go @@ -0,0 +1,22 @@ +package model + +import ( + "time" +) + +type CronJobState string + +const ( + CronJobStateIdle = "idle" + CronJobStateExecuting = "executing" +) + +type CronJob struct { + Key CronJobKey + DeploymentKey DeploymentKey + Verb VerbRef + Schedule string + StartTime time.Time + NextExecution time.Time + State CronJobState +} diff --git a/internal/model/verb_ref.go b/internal/model/verb_ref.go new file mode 100644 index 0000000000..3f4e507e33 --- /dev/null +++ b/internal/model/verb_ref.go @@ -0,0 +1,12 @@ +package model + +import "fmt" + +type VerbRef struct { + Module string + Name string +} + +func (v VerbRef) String() string { + return fmt.Sprintf("%s.%s", v.Module, v.Name) +} diff --git a/sqlc.yaml b/sqlc.yaml index efaabc9c0a..48891026de 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -44,6 +44,8 @@ sql: type: "NullCronJobKey" - db_type: "deployment_key" go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" + - db_type: "cron_job_state" + go_type: "github.com/TBD54566975/ftl/internal/model.CronJobState" - db_type: "deployment_key" nullable: true go_type: