From 8dea9850d7699a3e5cecb901131a670cd419930c Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Tue, 19 Nov 2024 12:47:59 +1100 Subject: [PATCH] Parallel job creation --- charts/agent-stack-k8s/values.schema.json | 22 +++- cmd/controller/controller_test.go | 1 + examples/config.yaml | 1 + internal/controller/config/config.go | 28 +++-- internal/controller/controller.go | 19 +-- internal/controller/monitor/monitor.go | 144 ++++++++++++++-------- 6 files changed, 139 insertions(+), 76 deletions(-) diff --git a/charts/agent-stack-k8s/values.schema.json b/charts/agent-stack-k8s/values.schema.json index 446ee14c..2fe23be9 100644 --- a/charts/agent-stack-k8s/values.schema.json +++ b/charts/agent-stack-k8s/values.schema.json @@ -192,26 +192,38 @@ "max-in-flight": { "type": "integer", "default": 0, - "title": "The max-in-flight Schema", + "title": "Sets an upper limit on the number of Kubernetes jobs that the controller will run", "examples": [100] }, "poll-interval": { "type": "string", - "default": "", - "title": "The poll-interval Schema", + "default": "1s", + "title": "Interval between polling Buildkite for jobs. Values below 1 second will be ignored and 1 second will be used instead", + "examples": ["1s", "1m"] + }, + "stale-job-data-timeout": { + "type": "string", + "default": "10s", + "title": "After polling Buildkite for jobs, the job data is considered valid up to this timeout", "examples": ["1s", "1m"] }, + "job-creation-concurrency": { + "type": "integer", + "default": 5, + "title": "Sets a limit on the number of Kubernetes jobs that will be attempted to be created simultaneously in parallel", + "examples": [1, 2, 5, 10] + }, "org": { "type": "string", "default": "", "minLength": 1, - "title": "The org Schema", + "title": "Buildkite organization slug", "examples": [""] }, "tags": { "type": "array", "default": [], - "title": "The tags Schema", + "title": "Buildkite agent tags used for acquiring jobs - 'queue' is required", "items": { "type": "string" }, diff --git a/cmd/controller/controller_test.go b/cmd/controller/controller_test.go index d083b15b..e95ebb75 100644 --- a/cmd/controller/controller_test.go +++ b/cmd/controller/controller_test.go @@ -28,6 +28,7 @@ func TestReadAndParseConfig(t *testing.T) { JobCancelCheckerPollInterval: 10 * time.Second, PollInterval: 5 * time.Second, StaleJobDataTimeout: 10 * time.Second, + JobCreationConcurrency: 5, MaxInFlight: 100, Namespace: "my-buildkite-ns", Org: "my-buildkite-org", diff --git a/examples/config.yaml b/examples/config.yaml index 1b640685..57231642 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -6,6 +6,7 @@ image-pull-backoff-grace-period: 60s job-cancel-checker-poll-interval: 10s poll-interval: 5s stale-job-data-timeout: 10s +job-creation-concurrency: 5 max-in-flight: 100 namespace: my-buildkite-ns org: my-buildkite-org diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index e9a51eef..a6365656 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -25,19 +25,20 @@ var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version() // mapstructure (the module) supports switching the struct tag to "json", viper does not. So we have // to have the `mapstructure` tag for viper and the `json` tag is used by the mapstructure! type Config struct { - Debug bool `json:"debug"` - JobTTL time.Duration `json:"job-ttl"` - PollInterval time.Duration `json:"poll-interval"` - StaleJobDataTimeout time.Duration `json:"stale-job-data-timeout" validate:"omitempty"` - AgentTokenSecret string `json:"agent-token-secret" validate:"required"` - BuildkiteToken string `json:"buildkite-token" validate:"required"` - Image string `json:"image" validate:"required"` - MaxInFlight int `json:"max-in-flight" validate:"min=0"` - Namespace string `json:"namespace" validate:"required"` - Org string `json:"org" validate:"required"` - Tags stringSlice `json:"tags" validate:"min=1"` - ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"` - GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"` + Debug bool `json:"debug"` + JobTTL time.Duration `json:"job-ttl"` + PollInterval time.Duration `json:"poll-interval"` + StaleJobDataTimeout time.Duration `json:"stale-job-data-timeout" validate:"omitempty"` + JobCreationConcurrency int `json:"job-creation-concurrency" validate:"omitempty"` + AgentTokenSecret string `json:"agent-token-secret" validate:"required"` + BuildkiteToken string `json:"buildkite-token" validate:"required"` + Image string `json:"image" validate:"required"` + MaxInFlight int `json:"max-in-flight" validate:"min=0"` + Namespace string `json:"namespace" validate:"required"` + Org string `json:"org" validate:"required"` + Tags stringSlice `json:"tags" validate:"min=1"` + ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"` + GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"` // Agent endpoint is set in agent-config. // ClusterUUID field is mandatory for most new orgs. @@ -76,6 +77,7 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddDuration("job-ttl", c.JobTTL) enc.AddDuration("poll-interval", c.PollInterval) enc.AddDuration("stale-job-data-timeout", c.StaleJobDataTimeout) + enc.AddInt("job-creation-concurrency", c.JobCreationConcurrency) enc.AddInt("max-in-flight", c.MaxInFlight) enc.AddString("namespace", c.Namespace) enc.AddString("org", c.Org) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 8f8b1ad8..3b25ae9f 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -43,15 +43,16 @@ func Run( // Monitor polls Buildkite GraphQL for jobs. It passes them to Deduper. // Job flow: monitor -> deduper -> limiter -> scheduler. m, err := monitor.New(logger.Named("monitor"), k8sClient, monitor.Config{ - GraphQLEndpoint: cfg.GraphQLEndpoint, - Namespace: cfg.Namespace, - Org: cfg.Org, - ClusterUUID: cfg.ClusterUUID, - MaxInFlight: cfg.MaxInFlight, - PollInterval: cfg.PollInterval, - StaleJobDataTimeout: cfg.StaleJobDataTimeout, - Tags: cfg.Tags, - Token: cfg.BuildkiteToken, + GraphQLEndpoint: cfg.GraphQLEndpoint, + Namespace: cfg.Namespace, + Org: cfg.Org, + ClusterUUID: cfg.ClusterUUID, + MaxInFlight: cfg.MaxInFlight, + PollInterval: cfg.PollInterval, + StaleJobDataTimeout: cfg.StaleJobDataTimeout, + JobCreationConcurrency: cfg.JobCreationConcurrency, + Tags: cfg.Tags, + Token: cfg.BuildkiteToken, }) if err != nil { logger.Fatal("failed to create monitor", zap.Error(err)) diff --git a/internal/controller/monitor/monitor.go b/internal/controller/monitor/monitor.go index 823e1ef0..5fc867e4 100644 --- a/internal/controller/monitor/monitor.go +++ b/internal/controller/monitor/monitor.go @@ -5,8 +5,8 @@ import ( "encoding/base64" "errors" "fmt" + "math/rand/v2" "reflect" - "sort" "time" "github.com/Khan/genqlient/graphql" @@ -24,15 +24,16 @@ type Monitor struct { } type Config struct { - GraphQLEndpoint string - Namespace string - Token string - ClusterUUID string - MaxInFlight int - PollInterval time.Duration - StaleJobDataTimeout time.Duration - Org string - Tags []string + GraphQLEndpoint string + Namespace string + Token string + ClusterUUID string + MaxInFlight int + JobCreationConcurrency int + PollInterval time.Duration + StaleJobDataTimeout time.Duration + Org string + Tags []string } func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) { @@ -46,6 +47,11 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er cfg.StaleJobDataTimeout = 10 * time.Second } + // Default CreationConcurrency to 5. + if cfg.JobCreationConcurrency <= 0 { + cfg.JobCreationConcurrency = 5 + } + return &Monitor{ gql: graphqlClient, logger: logger, @@ -164,9 +170,14 @@ func (m *Monitor) Start(ctx context.Context, handler model.JobHandler) <-chan er return } + jobs := resp.CommandJobs() + if len(jobs) == 0 { + continue + } + // The next handler should be the Limiter (except in some tests). // Limiter handles deduplicating jobs before passing to the scheduler. - m.passJobsToNextHandler(ctx, logger, handler, agentTags, resp.CommandJobs()) + m.passJobsToNextHandler(ctx, logger, handler, agentTags, jobs) } }() @@ -181,54 +192,89 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger, staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.StaleJobDataTimeout) defer staleCancel() - // TODO: sort by ScheduledAt in the API - sort.Slice(jobs, func(i, j int) bool { - return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt) + // Why shuffle the jobs? Suppose we sort the jobs to prefer, say, oldest. + // The first job we'll always try to schedule will then be the oldest, which + // sounds reasonable. But if that job is not able to be accepted by the + // cluster for some reason (e.g. there are multiple stack controllers on the + // same BK queue, and the job is already created by another controller), + // and the k8s API is slow, then we'll live-lock between grabbing jobs, + // trying to run the same oldest one, failing, then timing out (staleness). + // Shuffling increases the odds of making progress. + rand.Shuffle(len(jobs), func(i, j int) { + jobs[i], jobs[j] = jobs[j], jobs[i] }) - for _, j := range jobs { - if staleCtx.Err() != nil { - // Results already stale; try again later. - return - } + // We also try to get more jobs to the API by processing them in parallel. + jobsCh := make(chan *api.JobJobTypeCommand) + defer close(jobsCh) - jobTags := toMapAndLogErrors(logger, j.AgentQueryRules) - - // The api returns jobs that match ANY agent tags (the agent query rules) - // However, we can only acquire jobs that match ALL agent tags - if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { - logger.Debug("skipping job because it did not match all tags", zap.Any("job", j)) - continue - } + for range min(m.cfg.JobCreationConcurrency, len(jobs)) { + go jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh) + } - job := model.Job{ - CommandJob: &j.CommandJob, - StaleCh: staleCtx.Done(), + for _, job := range jobs { + select { + case <-ctx.Done(): + return + case <-staleCtx.Done(): + return + case jobsCh <- job: } + } +} - // The next handler should be the Limiter (except in some tests). - // Limiter handles deduplicating jobs before passing to the scheduler. - logger.Debug("passing job to next handler", - zap.Stringer("handler", reflect.TypeOf(handler)), - zap.String("uuid", j.Uuid), - ) - switch err := handler.Handle(ctx, job); { - case errors.Is(err, model.ErrDuplicateJob): - // Job wasn't scheduled because it's already scheduled. - - case errors.Is(err, model.ErrStaleJob): - // Job wasn't scheduled because the data has become stale. - // Staleness is set within this function, so we can return early. +func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler model.JobHandler, agentTags map[string]string, jobsCh <-chan *api.JobJobTypeCommand) { + for { + select { + case <-ctx.Done(): + return + case <-staleCtx.Done(): return + case j := <-jobsCh: + if j == nil { + return + } + jobTags := toMapAndLogErrors(logger, j.AgentQueryRules) + + // The api returns jobs that match ANY agent tags (the agent query rules) + // However, we can only acquire jobs that match ALL agent tags + if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { + logger.Debug("skipping job because it did not match all tags", zap.Any("job", j)) + continue + } - case err != nil: - // Note: this check is for the original context, not staleCtx, - // in order to avoid the log when the context is cancelled - // (particularly during tests). - if ctx.Err() != nil { + job := model.Job{ + CommandJob: &j.CommandJob, + StaleCh: staleCtx.Done(), + } + + // The next handler should be the deduper (except in some tests). + // Deduper handles deduplicating jobs before passing to the scheduler. + logger.Debug("passing job to next handler", + zap.Stringer("handler", reflect.TypeOf(handler)), + zap.String("uuid", j.Uuid), + ) + // The next handler operates under the main ctx, but can optionally + // use staleCtx.Done() (stored in job) to skip work. (Only Limiter + // does this.) + switch err := handler.Handle(ctx, job); { + case errors.Is(err, model.ErrDuplicateJob): + // Job wasn't scheduled because it's already scheduled. + + case errors.Is(err, model.ErrStaleJob): + // Job wasn't scheduled because the data has become stale. + // Staleness is set within this function, so we can return early. return + + case err != nil: + // Note: this check is for the original context, not staleCtx, + // in order to avoid the log when the context is cancelled + // (particularly during tests). + if ctx.Err() != nil { + return + } + logger.Error("failed to create job", zap.Error(err)) } - logger.Error("failed to create job", zap.Error(err)) } } }