Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel job creation #427

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions charts/agent-stack-k8s/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,38 @@
"max-in-flight": {
"type": "integer",
"default": 0,
"title": "The max-in-flight Schema",
"title": "Sets an upper limit on the number of Kubernetes jobs that the controller will run",
"examples": [100]
},
"poll-interval": {
"type": "string",
"default": "",
"title": "The poll-interval Schema",
"default": "1s",
"title": "Interval between polling Buildkite for jobs. Values below 1 second will be ignored and 1 second will be used instead",
"examples": ["1s", "1m"]
},
"stale-job-data-timeout": {
"type": "string",
"default": "10s",
"title": "After polling Buildkite for jobs, the job data is considered valid up to this timeout",
"examples": ["1s", "1m"]
},
"job-creation-concurrency": {
"type": "integer",
"default": 5,
"title": "Sets a limit on the number of Kubernetes jobs that will be attempted to be created simultaneously in parallel",
"examples": [1, 2, 5, 10]
},
"org": {
"type": "string",
"default": "",
"minLength": 1,
"title": "The org Schema",
"title": "Buildkite organization slug",
"examples": [""]
},
"tags": {
"type": "array",
"default": [],
"title": "The tags Schema",
"title": "Buildkite agent tags used for acquiring jobs - 'queue' is required",
"items": {
"type": "string"
},
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestReadAndParseConfig(t *testing.T) {
JobCancelCheckerPollInterval: 10 * time.Second,
PollInterval: 5 * time.Second,
StaleJobDataTimeout: 10 * time.Second,
JobCreationConcurrency: 5,
MaxInFlight: 100,
Namespace: "my-buildkite-ns",
Org: "my-buildkite-org",
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
poll-interval: 5s
stale-job-data-timeout: 10s
job-creation-concurrency: 5
max-in-flight: 100
namespace: my-buildkite-ns
org: my-buildkite-org
Expand Down
28 changes: 15 additions & 13 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version()
// mapstructure (the module) supports switching the struct tag to "json", viper does not. So we have
// to have the `mapstructure` tag for viper and the `json` tag is used by the mapstructure!
type Config struct {
Debug bool `json:"debug"`
JobTTL time.Duration `json:"job-ttl"`
PollInterval time.Duration `json:"poll-interval"`
StaleJobDataTimeout time.Duration `json:"stale-job-data-timeout" validate:"omitempty"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
Debug bool `json:"debug"`
JobTTL time.Duration `json:"job-ttl"`
PollInterval time.Duration `json:"poll-interval"`
StaleJobDataTimeout time.Duration `json:"stale-job-data-timeout" validate:"omitempty"`
JobCreationConcurrency int `json:"job-creation-concurrency" validate:"omitempty"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
// Agent endpoint is set in agent-config.

// ClusterUUID field is mandatory for most new orgs.
Expand Down Expand Up @@ -76,6 +77,7 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddDuration("job-ttl", c.JobTTL)
enc.AddDuration("poll-interval", c.PollInterval)
enc.AddDuration("stale-job-data-timeout", c.StaleJobDataTimeout)
enc.AddInt("job-creation-concurrency", c.JobCreationConcurrency)
enc.AddInt("max-in-flight", c.MaxInFlight)
enc.AddString("namespace", c.Namespace)
enc.AddString("org", c.Org)
Expand Down
19 changes: 10 additions & 9 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ func Run(
// Monitor polls Buildkite GraphQL for jobs. It passes them to Deduper.
// Job flow: monitor -> deduper -> limiter -> scheduler.
m, err := monitor.New(logger.Named("monitor"), k8sClient, monitor.Config{
GraphQLEndpoint: cfg.GraphQLEndpoint,
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
StaleJobDataTimeout: cfg.StaleJobDataTimeout,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
GraphQLEndpoint: cfg.GraphQLEndpoint,
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
StaleJobDataTimeout: cfg.StaleJobDataTimeout,
JobCreationConcurrency: cfg.JobCreationConcurrency,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
})
if err != nil {
logger.Fatal("failed to create monitor", zap.Error(err))
Expand Down
144 changes: 95 additions & 49 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"encoding/base64"
"errors"
"fmt"
"math/rand/v2"
"reflect"
"sort"
"time"

"github.com/Khan/genqlient/graphql"
Expand All @@ -24,15 +24,16 @@ type Monitor struct {
}

type Config struct {
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
PollInterval time.Duration
StaleJobDataTimeout time.Duration
Org string
Tags []string
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
JobCreationConcurrency int
PollInterval time.Duration
StaleJobDataTimeout time.Duration
Org string
Tags []string
}

func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) {
Expand All @@ -46,6 +47,11 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er
cfg.StaleJobDataTimeout = 10 * time.Second
}

// Default CreationConcurrency to 5.
if cfg.JobCreationConcurrency <= 0 {
cfg.JobCreationConcurrency = 5
}

return &Monitor{
gql: graphqlClient,
logger: logger,
Expand Down Expand Up @@ -164,9 +170,14 @@ func (m *Monitor) Start(ctx context.Context, handler model.JobHandler) <-chan er
return
}

jobs := resp.CommandJobs()
if len(jobs) == 0 {
continue
}

// The next handler should be the Limiter (except in some tests).
// Limiter handles deduplicating jobs before passing to the scheduler.
m.passJobsToNextHandler(ctx, logger, handler, agentTags, resp.CommandJobs())
m.passJobsToNextHandler(ctx, logger, handler, agentTags, jobs)
}
}()

Expand All @@ -181,54 +192,89 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger,
staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.StaleJobDataTimeout)
defer staleCancel()

// TODO: sort by ScheduledAt in the API
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt)
// Why shuffle the jobs? Suppose we sort the jobs to prefer, say, oldest.
// The first job we'll always try to schedule will then be the oldest, which
// sounds reasonable. But if that job is not able to be accepted by the
// cluster for some reason (e.g. there are multiple stack controllers on the
// same BK queue, and the job is already created by another controller),
// and the k8s API is slow, then we'll live-lock between grabbing jobs,
// trying to run the same oldest one, failing, then timing out (staleness).
// Shuffling increases the odds of making progress.
rand.Shuffle(len(jobs), func(i, j int) {
jobs[i], jobs[j] = jobs[j], jobs[i]
})

for _, j := range jobs {
if staleCtx.Err() != nil {
// Results already stale; try again later.
return
}
// We also try to get more jobs to the API by processing them in parallel.
jobsCh := make(chan *api.JobJobTypeCommand)
defer close(jobsCh)

jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}
for range min(m.cfg.JobCreationConcurrency, len(jobs)) {
go jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh)
}

job := model.Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
for _, job := range jobs {
select {
case <-ctx.Done():
return
case <-staleCtx.Done():
return
case jobsCh <- job:
}
}
}

// The next handler should be the Limiter (except in some tests).
// Limiter handles deduplicating jobs before passing to the scheduler.
logger.Debug("passing job to next handler",
zap.Stringer("handler", reflect.TypeOf(handler)),
zap.String("uuid", j.Uuid),
)
switch err := handler.Handle(ctx, job); {
case errors.Is(err, model.ErrDuplicateJob):
// Job wasn't scheduled because it's already scheduled.

case errors.Is(err, model.ErrStaleJob):
// Job wasn't scheduled because the data has become stale.
// Staleness is set within this function, so we can return early.
func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler model.JobHandler, agentTags map[string]string, jobsCh <-chan *api.JobJobTypeCommand) {
for {
select {
case <-ctx.Done():
return
case <-staleCtx.Done():
return
case j := <-jobsCh:
if j == nil {
return
}
jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}

case err != nil:
// Note: this check is for the original context, not staleCtx,
// in order to avoid the log when the context is cancelled
// (particularly during tests).
if ctx.Err() != nil {
job := model.Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
}

// The next handler should be the deduper (except in some tests).
// Deduper handles deduplicating jobs before passing to the scheduler.
logger.Debug("passing job to next handler",
zap.Stringer("handler", reflect.TypeOf(handler)),
zap.String("uuid", j.Uuid),
)
// The next handler operates under the main ctx, but can optionally
// use staleCtx.Done() (stored in job) to skip work. (Only Limiter
// does this.)
switch err := handler.Handle(ctx, job); {
case errors.Is(err, model.ErrDuplicateJob):
// Job wasn't scheduled because it's already scheduled.

case errors.Is(err, model.ErrStaleJob):
// Job wasn't scheduled because the data has become stale.
// Staleness is set within this function, so we can return early.
return

case err != nil:
// Note: this check is for the original context, not staleCtx,
// in order to avoid the log when the context is cancelled
// (particularly during tests).
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
}
logger.Error("failed to create job", zap.Error(err))
}
}
}
Expand Down