Skip to content

Commit

Permalink
Test: add tests for pkg/executor (#37)
Browse files Browse the repository at this point in the history
* Test: add tests

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Test: add tests for executor

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* Test: add tests for executor

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

* use for loop

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>

---------

Signed-off-by: Charlie Chiang <charlie_c_0129@outlook.com>
  • Loading branch information
charlie0129 authored Mar 20, 2023
1 parent defe4c0 commit c97d101
Show file tree
Hide file tree
Showing 3 changed files with 421 additions and 39 deletions.
3 changes: 1 addition & 2 deletions pkg/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func runCli(cmd *cobra.Command, args []string) error {
if err != nil {
return errors.Wrap(err, "error when creating executor")
}
defer exe.Shutdown()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -188,7 +187,7 @@ func runCli(cmd *cobra.Command, args []string) error {
}

// Let the workers run Actions.
exe.RunJobs(ctx)
go exe.RunJobs(ctx)

// Listen to termination signals.
sigterm := make(chan os.Signal, 1)
Expand Down
104 changes: 67 additions & 37 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ import (
)

const (
// maxRetryDelay is the max re-queueing delay of the exponential failure
// rate limiter.
maxRetryDelay = 1200 * time.Second
)

// Executor is a rate-limited work queue with concurrent workers.
// Executor is a rate-limited job queue with concurrent workers.
type Executor struct {
workers int
queueSize int
maxQueueSize int
maxRetries int
allowRetries bool
wg sync.WaitGroup
Expand All @@ -53,28 +55,37 @@ type Job interface {
AllowConcurrency() bool
}

// Config is the config for executor
// Config is the config for the Executor.
type Config struct {
QueueSize int
Workers int
MaxJobRetries int
BaseRetryDelay time.Duration
// QueueSize is the maximum number of jobs in the executor queue.
QueueSize int
// Workers is the number of workers that are concurrently executing jobs.
Workers int
// MaxJobRetries is the maximum number of retries if a job fails. This should
// not be zero. If you want to disable retries, just disable RetryJobAfterFailure.
MaxJobRetries int
// BaseRetryDelay defines how long after a job fails before it is re-queued.
BaseRetryDelay time.Duration
// RetryJobAfterFailure allows the job to be re-queued if it fails.
RetryJobAfterFailure bool
PerWorkerQPS int
Timeout time.Duration
// PerWorkerQPS is the max QPS of a worker before it is rate-limited. With Workers,
// Workers*PerWorkerQPS is the overall QPS limit of the entire executor.
PerWorkerQPS int
// Timeout defines how long a single job is allowed to run and how long the
// entire executor should wait for all the jobs to stop when shutting down.
Timeout time.Duration
}

// New creates a new Executor with a queue size, number of workers,
// and a job-running or shutdown timeout.
// New creates a new Executor with user-provided Config.
func New(c Config) (*Executor, error) {
if c.QueueSize == 0 || c.Workers == 0 || c.MaxJobRetries == 0 ||
c.BaseRetryDelay == 0 || c.Timeout == 0 || c.PerWorkerQPS == 0 {
if c.QueueSize == 0 || c.Workers == 0 || c.BaseRetryDelay == 0 ||
c.Timeout == 0 || c.PerWorkerQPS == 0 {
return nil, fmt.Errorf("invalid executor config")
}
e := &Executor{}
e.workers = c.Workers
e.timeout = c.Timeout
e.queueSize = c.QueueSize
e.maxQueueSize = c.QueueSize
e.maxRetries = c.MaxJobRetries
e.allowRetries = c.RetryJobAfterFailure
e.wg = sync.WaitGroup{}
Expand All @@ -86,15 +97,15 @@ func New(c Config) (*Executor, error) {
workqueue.NewItemExponentialFailureRateLimiter(c.BaseRetryDelay, maxRetryDelay),
&workqueue.BucketRateLimiter{
// Token Bucket limiter, with
// qps = workers * qpsToWorkerRatio, maxBurst = queueSize
// qps = workers * qpsToWorkerRatio, maxBurst = QueueSize
Limiter: rate.NewLimiter(rate.Limit(c.Workers*c.PerWorkerQPS), c.QueueSize),
},
),
)
e.logger = logrus.WithField("executor", "action-job-executor")
e.logger.Infof("new executor created, %d queue size, %d concurrnt workers, %v timeout, "+
"allow retries %v, max %d retries",
e.queueSize,
e.maxQueueSize,
e.workers,
e.timeout,
e.allowRetries,
Expand All @@ -115,7 +126,7 @@ func (e *Executor) setJobRunning(j Job) {
e.setJobStatus(j, true)
}

func (e *Executor) setJobNotRunning(j Job) {
func (e *Executor) setJobFinished(j Job) {
e.setJobStatus(j, false)
}

Expand All @@ -132,20 +143,28 @@ func (e *Executor) requeueJob(j Job) {
e.queue.AddRateLimited(j)
return
}
e.logger.Errorf("requeue %s job %s failed, it failed %d times, too many retries", j.Type(), j.ID(), e.queue.NumRequeues(j))
e.logger.Errorf("job %s (%s) cannot be requeued because it failed too many (%d/%d) times", j.Type(), j.ID(), e.queue.NumRequeues(j), e.maxRetries)
e.queue.Forget(j)
}

// AddJob adds a job to the queue.
func (e *Executor) AddJob(j Job) error {
if e.queue.Len() >= e.queueSize {
return fmt.Errorf("queue full with size %d, cannot add %s job %s", e.queue.Len(), j.Type(), j.ID())
if e.queue.Len() >= e.maxQueueSize {
msg := fmt.Sprintf("job %s (%s) cannot be added, queue size full %d/%d", j.Type(), j.ID(), e.queue.Len(), e.maxQueueSize)
e.logger.Errorf(msg)
return fmt.Errorf(msg)
}
e.queue.Add(j)
e.logger.Debugf("job %s (%s) added to queue, currnet queue size: %d/%d", j.Type(), j.ID(), e.queue.Len(), e.maxQueueSize)
return nil
}

func (e *Executor) runJob(ctx context.Context) bool {
if ctx.Err() != nil {
e.logger.Infof("worker exiting because %s", ctx.Err())
return false
}

item, quit := e.queue.Get()
if quit {
return false
Expand All @@ -161,12 +180,12 @@ func (e *Executor) runJob(ctx context.Context) bool {
return true
}

e.logger.Infof("job picked up by a worker, going to run %s job: %s", j.Type(), j.ID())
e.logger.Debugf("job %s (%s) is picked up by a worker", j.Type(), j.ID())

// This job does not allow concurrent runs, and it is already running.
// Requeue it to run it later.
if !j.AllowConcurrency() && e.getJobStatus(j) {
e.logger.Infof("same %s job %s is already running, will run later", j.Type(), j.ID())
e.logger.Infof("job %s (%s) is already running, will be requeued", j.Type(), j.ID())
e.requeueJob(j)
return true
}
Expand All @@ -175,21 +194,30 @@ func (e *Executor) runJob(ctx context.Context) bool {
timeoutCtx, cancel := context.WithDeadline(ctx, time.Now().Add(e.timeout))
defer cancel()

e.logger.Infof("job %s (%s) started executing", j.Type(), j.ID())
e.setJobRunning(j)
err := j.Run(timeoutCtx)
e.setJobNotRunning(j)
e.setJobFinished(j)

if err == nil && timeoutCtx.Err() == nil {
e.logger.Infof("%s job %s finished", j.Type(), j.ID())
e.logger.Infof("job %s (%s) finished", j.Type(), j.ID())
e.queue.Forget(j)
} else {
e.logger.Errorf("%s job %s failed: jobErr=%s, ctxErr=%s", j.Type(), j.ID(), err, timeoutCtx.Err())
if e.allowRetries {
e.logger.Infof("will retry %s job %s later", j.Type(), j.ID())
e.requeueJob(j)
}
return true
}

// context cancelled, it is time to die
if timeoutCtx.Err() == context.Canceled {
e.logger.Infof("job %s (%s) failed because ctx errored: %s, worker will exit soon", j.Type(), j.ID(), timeoutCtx.Err())
return false
}

msg := fmt.Sprintf("job %s (%s) failed because (jobErr=%v, ctxErr=%v)", j.Type(), j.ID(), err, timeoutCtx.Err())
if e.allowRetries {
msg += fmt.Sprintf(", will retry job %s (%s) later", j.Type(), j.ID())
e.requeueJob(j)
}
e.logger.Errorf(msg)

return true
}

Expand All @@ -203,26 +231,28 @@ func (e *Executor) RunJobs(ctx context.Context) {
e.wg.Done()
}()
}
}
e.logger.Infof("executor started with %d workers", e.workers)

<-ctx.Done()

// Shutdown stops workers.
func (e *Executor) Shutdown() bool {
e.logger.Infof("shutting down executor")

// Wait for workers to end with a timeout.
// Shutdown queue, wait for workers to end with a timeout.
e.wg.Add(1)
go func() {
e.queue.ShutDownWithDrain()
e.wg.Done()
}()
ch := make(chan struct{})
go func() {
e.queue.ShutDown()
e.wg.Wait()
close(ch)
}()

select {
case <-ch:
e.logger.Infof("shutdown successful")
return true
case <-time.After(e.timeout):
e.logger.Infof("shutdown timed out")
return false
}
}
Loading

0 comments on commit c97d101

Please sign in to comment.