Skip to content

Commit

Permalink
Parallel job creation
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Nov 19, 2024
1 parent 073cd91 commit 8dea985
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 76 deletions.
22 changes: 17 additions & 5 deletions charts/agent-stack-k8s/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,38 @@
"max-in-flight": {
"type": "integer",
"default": 0,
"title": "The max-in-flight Schema",
"title": "Sets an upper limit on the number of Kubernetes jobs that the controller will run",
"examples": [100]
},
"poll-interval": {
"type": "string",
"default": "",
"title": "The poll-interval Schema",
"default": "1s",
"title": "Interval between polling Buildkite for jobs. Values below 1 second will be ignored and 1 second will be used instead",
"examples": ["1s", "1m"]
},
"stale-job-data-timeout": {
"type": "string",
"default": "10s",
"title": "After polling Buildkite for jobs, the job data is considered valid up to this timeout",
"examples": ["1s", "1m"]
},
"job-creation-concurrency": {
"type": "integer",
"default": 5,
"title": "Sets a limit on the number of Kubernetes jobs that will be attempted to be created simultaneously in parallel",
"examples": [1, 2, 5, 10]
},
"org": {
"type": "string",
"default": "",
"minLength": 1,
"title": "The org Schema",
"title": "Buildkite organization slug",
"examples": [""]
},
"tags": {
"type": "array",
"default": [],
"title": "The tags Schema",
"title": "Buildkite agent tags used for acquiring jobs - 'queue' is required",
"items": {
"type": "string"
},
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestReadAndParseConfig(t *testing.T) {
JobCancelCheckerPollInterval: 10 * time.Second,
PollInterval: 5 * time.Second,
StaleJobDataTimeout: 10 * time.Second,
JobCreationConcurrency: 5,
MaxInFlight: 100,
Namespace: "my-buildkite-ns",
Org: "my-buildkite-org",
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
poll-interval: 5s
stale-job-data-timeout: 10s
job-creation-concurrency: 5
max-in-flight: 100
namespace: my-buildkite-ns
org: my-buildkite-org
Expand Down
28 changes: 15 additions & 13 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version()
// mapstructure (the module) supports switching the struct tag to "json", viper does not. So we have
// to have the `mapstructure` tag for viper and the `json` tag is used by the mapstructure!
type Config struct {
Debug bool `json:"debug"`
JobTTL time.Duration `json:"job-ttl"`
PollInterval time.Duration `json:"poll-interval"`
StaleJobDataTimeout time.Duration `json:"stale-job-data-timeout" validate:"omitempty"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
Debug bool `json:"debug"`
JobTTL time.Duration `json:"job-ttl"`
PollInterval time.Duration `json:"poll-interval"`
StaleJobDataTimeout time.Duration `json:"stale-job-data-timeout" validate:"omitempty"`
JobCreationConcurrency int `json:"job-creation-concurrency" validate:"omitempty"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
// Agent endpoint is set in agent-config.

// ClusterUUID field is mandatory for most new orgs.
Expand Down Expand Up @@ -76,6 +77,7 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddDuration("job-ttl", c.JobTTL)
enc.AddDuration("poll-interval", c.PollInterval)
enc.AddDuration("stale-job-data-timeout", c.StaleJobDataTimeout)
enc.AddInt("job-creation-concurrency", c.JobCreationConcurrency)
enc.AddInt("max-in-flight", c.MaxInFlight)
enc.AddString("namespace", c.Namespace)
enc.AddString("org", c.Org)
Expand Down
19 changes: 10 additions & 9 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ func Run(
// Monitor polls Buildkite GraphQL for jobs. It passes them to Deduper.
// Job flow: monitor -> deduper -> limiter -> scheduler.
m, err := monitor.New(logger.Named("monitor"), k8sClient, monitor.Config{
GraphQLEndpoint: cfg.GraphQLEndpoint,
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
StaleJobDataTimeout: cfg.StaleJobDataTimeout,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
GraphQLEndpoint: cfg.GraphQLEndpoint,
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
StaleJobDataTimeout: cfg.StaleJobDataTimeout,
JobCreationConcurrency: cfg.JobCreationConcurrency,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
})
if err != nil {
logger.Fatal("failed to create monitor", zap.Error(err))
Expand Down
144 changes: 95 additions & 49 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"encoding/base64"
"errors"
"fmt"
"math/rand/v2"
"reflect"
"sort"
"time"

"github.com/Khan/genqlient/graphql"
Expand All @@ -24,15 +24,16 @@ type Monitor struct {
}

type Config struct {
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
PollInterval time.Duration
StaleJobDataTimeout time.Duration
Org string
Tags []string
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
JobCreationConcurrency int
PollInterval time.Duration
StaleJobDataTimeout time.Duration
Org string
Tags []string
}

func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) {
Expand All @@ -46,6 +47,11 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er
cfg.StaleJobDataTimeout = 10 * time.Second
}

// Default CreationConcurrency to 5.
if cfg.JobCreationConcurrency <= 0 {
cfg.JobCreationConcurrency = 5
}

return &Monitor{
gql: graphqlClient,
logger: logger,
Expand Down Expand Up @@ -164,9 +170,14 @@ func (m *Monitor) Start(ctx context.Context, handler model.JobHandler) <-chan er
return
}

jobs := resp.CommandJobs()
if len(jobs) == 0 {
continue
}

// The next handler should be the Limiter (except in some tests).
// Limiter handles deduplicating jobs before passing to the scheduler.
m.passJobsToNextHandler(ctx, logger, handler, agentTags, resp.CommandJobs())
m.passJobsToNextHandler(ctx, logger, handler, agentTags, jobs)
}
}()

Expand All @@ -181,54 +192,89 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger,
staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.StaleJobDataTimeout)
defer staleCancel()

// TODO: sort by ScheduledAt in the API
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt)
// Why shuffle the jobs? Suppose we sort the jobs to prefer, say, oldest.
// The first job we'll always try to schedule will then be the oldest, which
// sounds reasonable. But if that job is not able to be accepted by the
// cluster for some reason (e.g. there are multiple stack controllers on the
// same BK queue, and the job is already created by another controller),
// and the k8s API is slow, then we'll live-lock between grabbing jobs,
// trying to run the same oldest one, failing, then timing out (staleness).
// Shuffling increases the odds of making progress.
rand.Shuffle(len(jobs), func(i, j int) {
jobs[i], jobs[j] = jobs[j], jobs[i]
})

for _, j := range jobs {
if staleCtx.Err() != nil {
// Results already stale; try again later.
return
}
// We also try to get more jobs to the API by processing them in parallel.
jobsCh := make(chan *api.JobJobTypeCommand)
defer close(jobsCh)

jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}
for range min(m.cfg.JobCreationConcurrency, len(jobs)) {
go jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh)
}

job := model.Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
for _, job := range jobs {
select {
case <-ctx.Done():
return
case <-staleCtx.Done():
return
case jobsCh <- job:
}
}
}

// The next handler should be the Limiter (except in some tests).
// Limiter handles deduplicating jobs before passing to the scheduler.
logger.Debug("passing job to next handler",
zap.Stringer("handler", reflect.TypeOf(handler)),
zap.String("uuid", j.Uuid),
)
switch err := handler.Handle(ctx, job); {
case errors.Is(err, model.ErrDuplicateJob):
// Job wasn't scheduled because it's already scheduled.

case errors.Is(err, model.ErrStaleJob):
// Job wasn't scheduled because the data has become stale.
// Staleness is set within this function, so we can return early.
func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler model.JobHandler, agentTags map[string]string, jobsCh <-chan *api.JobJobTypeCommand) {
for {
select {
case <-ctx.Done():
return
case <-staleCtx.Done():
return
case j := <-jobsCh:
if j == nil {
return
}
jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}

case err != nil:
// Note: this check is for the original context, not staleCtx,
// in order to avoid the log when the context is cancelled
// (particularly during tests).
if ctx.Err() != nil {
job := model.Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
}

// The next handler should be the deduper (except in some tests).
// Deduper handles deduplicating jobs before passing to the scheduler.
logger.Debug("passing job to next handler",
zap.Stringer("handler", reflect.TypeOf(handler)),
zap.String("uuid", j.Uuid),
)
// The next handler operates under the main ctx, but can optionally
// use staleCtx.Done() (stored in job) to skip work. (Only Limiter
// does this.)
switch err := handler.Handle(ctx, job); {
case errors.Is(err, model.ErrDuplicateJob):
// Job wasn't scheduled because it's already scheduled.

case errors.Is(err, model.ErrStaleJob):
// Job wasn't scheduled because the data has become stale.
// Staleness is set within this function, so we can return early.
return

case err != nil:
// Note: this check is for the original context, not staleCtx,
// in order to avoid the log when the context is cancelled
// (particularly during tests).
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
}
logger.Error("failed to create job", zap.Error(err))
}
}
}
Expand Down

0 comments on commit 8dea985

Please sign in to comment.