From c97d10134df5b8b2b9af0fce4b5e8ba8e2eccfc1 Mon Sep 17 00:00:00 2001 From: Charlie Chiang Date: Mon, 20 Mar 2023 10:42:56 +0800 Subject: [PATCH] Test: add tests for `pkg/executor` (#37) * Test: add tests Signed-off-by: Charlie Chiang * Test: add tests for executor Signed-off-by: Charlie Chiang * Test: add tests for executor Signed-off-by: Charlie Chiang * use for loop Signed-off-by: Charlie Chiang --------- Signed-off-by: Charlie Chiang --- pkg/cmd/cmd.go | 3 +- pkg/executor/executor.go | 104 ++++++---- pkg/executor/executor_test.go | 353 ++++++++++++++++++++++++++++++++++ 3 files changed, 421 insertions(+), 39 deletions(-) create mode 100644 pkg/executor/executor_test.go diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 36b7be8..7b4510c 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -148,7 +148,6 @@ func runCli(cmd *cobra.Command, args []string) error { if err != nil { return errors.Wrap(err, "error when creating executor") } - defer exe.Shutdown() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -188,7 +187,7 @@ func runCli(cmd *cobra.Command, args []string) error { } // Let the workers run Actions. - exe.RunJobs(ctx) + go exe.RunJobs(ctx) // Listen to termination signals. sigterm := make(chan os.Signal, 1) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index f6d940b..84ccb00 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -29,13 +29,15 @@ import ( ) const ( + // maxRetryDelay is the max re-queueing delay of the exponential failure + // rate limiter. maxRetryDelay = 1200 * time.Second ) -// Executor is a rate-limited work queue with concurrent workers. +// Executor is a rate-limited job queue with concurrent workers. type Executor struct { workers int - queueSize int + maxQueueSize int maxRetries int allowRetries bool wg sync.WaitGroup @@ -53,28 +55,37 @@ type Job interface { AllowConcurrency() bool } -// Config is the config for executor +// Config is the config for the Executor. type Config struct { - QueueSize int - Workers int - MaxJobRetries int - BaseRetryDelay time.Duration + // QueueSize is the maximum number of jobs in the executor queue. + QueueSize int + // Workers is the number of workers that are concurrently executing jobs. + Workers int + // MaxJobRetries is the maximum number of retries if a job fails. This should + // not be zero. If you want to disable retries, just disable RetryJobAfterFailure. + MaxJobRetries int + // BaseRetryDelay defines how long after a job fails before it is re-queued. + BaseRetryDelay time.Duration + // RetryJobAfterFailure allows the job to be re-queued if it fails. RetryJobAfterFailure bool - PerWorkerQPS int - Timeout time.Duration + // PerWorkerQPS is the max QPS of a worker before it is rate-limited. With Workers, + // Workers*PerWorkerQPS is the overall QPS limit of the entire executor. + PerWorkerQPS int + // Timeout defines how long a single job is allowed to run and how long the + // entire executor should wait for all the jobs to stop when shutting down. + Timeout time.Duration } -// New creates a new Executor with a queue size, number of workers, -// and a job-running or shutdown timeout. +// New creates a new Executor with user-provided Config. func New(c Config) (*Executor, error) { - if c.QueueSize == 0 || c.Workers == 0 || c.MaxJobRetries == 0 || - c.BaseRetryDelay == 0 || c.Timeout == 0 || c.PerWorkerQPS == 0 { + if c.QueueSize == 0 || c.Workers == 0 || c.BaseRetryDelay == 0 || + c.Timeout == 0 || c.PerWorkerQPS == 0 { return nil, fmt.Errorf("invalid executor config") } e := &Executor{} e.workers = c.Workers e.timeout = c.Timeout - e.queueSize = c.QueueSize + e.maxQueueSize = c.QueueSize e.maxRetries = c.MaxJobRetries e.allowRetries = c.RetryJobAfterFailure e.wg = sync.WaitGroup{} @@ -86,7 +97,7 @@ func New(c Config) (*Executor, error) { workqueue.NewItemExponentialFailureRateLimiter(c.BaseRetryDelay, maxRetryDelay), &workqueue.BucketRateLimiter{ // Token Bucket limiter, with - // qps = workers * qpsToWorkerRatio, maxBurst = queueSize + // qps = workers * qpsToWorkerRatio, maxBurst = QueueSize Limiter: rate.NewLimiter(rate.Limit(c.Workers*c.PerWorkerQPS), c.QueueSize), }, ), @@ -94,7 +105,7 @@ func New(c Config) (*Executor, error) { e.logger = logrus.WithField("executor", "action-job-executor") e.logger.Infof("new executor created, %d queue size, %d concurrnt workers, %v timeout, "+ "allow retries %v, max %d retries", - e.queueSize, + e.maxQueueSize, e.workers, e.timeout, e.allowRetries, @@ -115,7 +126,7 @@ func (e *Executor) setJobRunning(j Job) { e.setJobStatus(j, true) } -func (e *Executor) setJobNotRunning(j Job) { +func (e *Executor) setJobFinished(j Job) { e.setJobStatus(j, false) } @@ -132,20 +143,28 @@ func (e *Executor) requeueJob(j Job) { e.queue.AddRateLimited(j) return } - e.logger.Errorf("requeue %s job %s failed, it failed %d times, too many retries", j.Type(), j.ID(), e.queue.NumRequeues(j)) + e.logger.Errorf("job %s (%s) cannot be requeued because it failed too many (%d/%d) times", j.Type(), j.ID(), e.queue.NumRequeues(j), e.maxRetries) e.queue.Forget(j) } // AddJob adds a job to the queue. func (e *Executor) AddJob(j Job) error { - if e.queue.Len() >= e.queueSize { - return fmt.Errorf("queue full with size %d, cannot add %s job %s", e.queue.Len(), j.Type(), j.ID()) + if e.queue.Len() >= e.maxQueueSize { + msg := fmt.Sprintf("job %s (%s) cannot be added, queue size full %d/%d", j.Type(), j.ID(), e.queue.Len(), e.maxQueueSize) + e.logger.Errorf(msg) + return fmt.Errorf(msg) } e.queue.Add(j) + e.logger.Debugf("job %s (%s) added to queue, currnet queue size: %d/%d", j.Type(), j.ID(), e.queue.Len(), e.maxQueueSize) return nil } func (e *Executor) runJob(ctx context.Context) bool { + if ctx.Err() != nil { + e.logger.Infof("worker exiting because %s", ctx.Err()) + return false + } + item, quit := e.queue.Get() if quit { return false @@ -161,12 +180,12 @@ func (e *Executor) runJob(ctx context.Context) bool { return true } - e.logger.Infof("job picked up by a worker, going to run %s job: %s", j.Type(), j.ID()) + e.logger.Debugf("job %s (%s) is picked up by a worker", j.Type(), j.ID()) // This job does not allow concurrent runs, and it is already running. // Requeue it to run it later. if !j.AllowConcurrency() && e.getJobStatus(j) { - e.logger.Infof("same %s job %s is already running, will run later", j.Type(), j.ID()) + e.logger.Infof("job %s (%s) is already running, will be requeued", j.Type(), j.ID()) e.requeueJob(j) return true } @@ -175,21 +194,30 @@ func (e *Executor) runJob(ctx context.Context) bool { timeoutCtx, cancel := context.WithDeadline(ctx, time.Now().Add(e.timeout)) defer cancel() + e.logger.Infof("job %s (%s) started executing", j.Type(), j.ID()) e.setJobRunning(j) err := j.Run(timeoutCtx) - e.setJobNotRunning(j) + e.setJobFinished(j) if err == nil && timeoutCtx.Err() == nil { - e.logger.Infof("%s job %s finished", j.Type(), j.ID()) + e.logger.Infof("job %s (%s) finished", j.Type(), j.ID()) e.queue.Forget(j) - } else { - e.logger.Errorf("%s job %s failed: jobErr=%s, ctxErr=%s", j.Type(), j.ID(), err, timeoutCtx.Err()) - if e.allowRetries { - e.logger.Infof("will retry %s job %s later", j.Type(), j.ID()) - e.requeueJob(j) - } + return true } + // context cancelled, it is time to die + if timeoutCtx.Err() == context.Canceled { + e.logger.Infof("job %s (%s) failed because ctx errored: %s, worker will exit soon", j.Type(), j.ID(), timeoutCtx.Err()) + return false + } + + msg := fmt.Sprintf("job %s (%s) failed because (jobErr=%v, ctxErr=%v)", j.Type(), j.ID(), err, timeoutCtx.Err()) + if e.allowRetries { + msg += fmt.Sprintf(", will retry job %s (%s) later", j.Type(), j.ID()) + e.requeueJob(j) + } + e.logger.Errorf(msg) + return true } @@ -203,16 +231,20 @@ func (e *Executor) RunJobs(ctx context.Context) { e.wg.Done() }() } -} + e.logger.Infof("executor started with %d workers", e.workers) + + <-ctx.Done() -// Shutdown stops workers. -func (e *Executor) Shutdown() bool { e.logger.Infof("shutting down executor") - // Wait for workers to end with a timeout. + // Shutdown queue, wait for workers to end with a timeout. + e.wg.Add(1) + go func() { + e.queue.ShutDownWithDrain() + e.wg.Done() + }() ch := make(chan struct{}) go func() { - e.queue.ShutDown() e.wg.Wait() close(ch) }() @@ -220,9 +252,7 @@ func (e *Executor) Shutdown() bool { select { case <-ch: e.logger.Infof("shutdown successful") - return true case <-time.After(e.timeout): e.logger.Infof("shutdown timed out") - return false } } diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go new file mode 100644 index 0000000..00f17fa --- /dev/null +++ b/pkg/executor/executor_test.go @@ -0,0 +1,353 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" +) + +func TestNormalJobs(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + a := assert.New(t) + c := Config{ + QueueSize: 5, + Workers: 3, + MaxJobRetries: 0, + BaseRetryDelay: 10 * time.Millisecond, + RetryJobAfterFailure: false, + PerWorkerQPS: 5, + Timeout: 200 * time.Millisecond, + } + + e, err := New(c) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 0)) + + for i := 1; i <= 3; i++ { + err = e.AddJob(&sleepingJob{100 * time.Millisecond, fmt.Sprint(i)}) + a.NoError(err) + a.NoError(waitForAdded(e.queue, i)) + } + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + go func() { + e.RunJobs(ctx) + close(ch) + }() + + // Wait for all jobs to run + err = wait.Poll(1*time.Millisecond, 200*time.Millisecond, func() (done bool, err error) { + l := 0 + e.runningJobs.Range(func(_ any, _ any) bool { + l += 1 + return true + }) + return l == 3, nil + }) + a.NoError(err) + + // Wait for all jobs to end + err = wait.Poll(1*time.Millisecond, 200*time.Millisecond, func() (done bool, err error) { + l := 0 + e.runningJobs.Range(func(_ any, _ any) bool { + l += 1 + return true + }) + return l == 0, nil + }) + a.NoError(err) + + cancel() + <-ch + a.NoError(waitForAdded(e.queue, 0)) +} + +func TestQueueSizeLimits(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + a := assert.New(t) + c := Config{ + QueueSize: 3, + Workers: 3, + MaxJobRetries: 0, + BaseRetryDelay: 1 * time.Second, + RetryJobAfterFailure: false, + PerWorkerQPS: 5, + Timeout: 5 * time.Second, + } + + e, err := New(c) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 0)) + + for i := 1; i <= 3; i++ { + err = e.AddJob(&sleepingJob{10 * time.Second, fmt.Sprint(i)}) + a.NoError(err) + a.NoError(waitForAdded(e.queue, i)) + } + + // Queue full + err = e.AddJob(&sleepingJob{10 * time.Second, "4"}) + a.Error(err) + a.NoError(waitForAdded(e.queue, 3)) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + go func() { + e.RunJobs(ctx) + close(ch) + }() + + err = wait.Poll(10*time.Millisecond, 200*time.Millisecond, func() (done bool, err error) { + l := 0 + e.runningJobs.Range(func(_ any, _ any) bool { + l += 1 + return true + }) + return l == 3, nil + }) + a.NoError(err) + + cancel() + <-ch + a.NoError(waitForAdded(e.queue, 0)) +} + +func TestSameJobRequeuing(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + a := assert.New(t) + c := Config{ + QueueSize: 5, + Workers: 3, + MaxJobRetries: 5, + BaseRetryDelay: 10 * time.Millisecond, + RetryJobAfterFailure: true, + PerWorkerQPS: 10, + Timeout: 300 * time.Millisecond, + } + + e, err := New(c) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 0)) + + // Jobs with same id + j1 := &sleepingJob{200 * time.Millisecond, "1"} + j2 := &sleepingJob{200 * time.Millisecond, "1"} + j3 := &sleepingJob{100 * time.Millisecond, "1"} + + err = e.AddJob(j1) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 1)) + + err = e.AddJob(j2) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 2)) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + go func() { + e.RunJobs(ctx) + close(ch) + }() + + err = wait.Poll(1*time.Millisecond, 200*time.Millisecond, func() (done bool, err error) { + l := 0 + e.runningJobs.Range(func(_ any, _ any) bool { + l += 1 + return true + }) + return l >= 1, nil + }) + a.NoError(err) + + err = e.AddJob(j3) + a.NoError(err) + + err = wait.Poll(1*time.Millisecond, 500*time.Millisecond, func() (done bool, err error) { + return e.queue.NumRequeues(j3) >= 2, nil + }) + + cancel() + <-ch + a.NoError(waitForAdded(e.queue, 0)) +} + +func TestFailedRequeuing(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + a := assert.New(t) + c := Config{ + QueueSize: 5, + Workers: 1, + MaxJobRetries: 3, + BaseRetryDelay: 50 * time.Millisecond, + RetryJobAfterFailure: true, + PerWorkerQPS: 500, + Timeout: 25 * time.Millisecond, + } + + e, err := New(c) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 0)) + + j1 := &failingJob{"1"} + err = e.AddJob(j1) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 1)) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + go func() { + e.RunJobs(ctx) + close(ch) + }() + + // Test the job itself failed + // requeued 3 times (max retries) + err = wait.Poll(1*time.Millisecond, 500*time.Millisecond, func() (done bool, err error) { + return e.queue.NumRequeues(j1) == 3, nil + }) + a.NoError(err, fmt.Sprint(e.queue.NumRequeues(j1)), "3") + // too many requeues, cleared + err = wait.Poll(1*time.Millisecond, 500*time.Millisecond, func() (done bool, err error) { + return e.queue.NumRequeues(j1) == 0, nil + }) + a.NoError(err, fmt.Sprint(e.queue.NumRequeues(j1)), "0") + + cancel() + <-ch + a.NoError(waitForAdded(e.queue, 0)) +} + +func TestTimedOutRequeuing(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + a := assert.New(t) + c := Config{ + QueueSize: 5, + Workers: 1, + MaxJobRetries: 3, + BaseRetryDelay: 50 * time.Millisecond, + RetryJobAfterFailure: true, + PerWorkerQPS: 500, + Timeout: 25 * time.Millisecond, + } + + e, err := New(c) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 0)) + + j2 := &sleepingJob{1 * time.Second, "2"} + err = e.AddJob(j2) + a.NoError(err) + a.NoError(waitForAdded(e.queue, 1)) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + go func() { + e.RunJobs(ctx) + close(ch) + }() + + // Test the job timed out + err = wait.Poll(10*time.Millisecond, 1000*time.Millisecond, func() (done bool, err error) { + return e.queue.NumRequeues(j2) == 3, nil + }) + a.NoError(err, fmt.Sprint(e.queue.NumRequeues(j2)), "3") + // too many requeues, cleared + err = wait.Poll(10*time.Millisecond, 1000*time.Millisecond, func() (done bool, err error) { + return e.queue.NumRequeues(j2) == 0, nil + }) + a.NoError(err, fmt.Sprint(e.queue.NumRequeues(j2)), "0") + + cancel() + <-ch + a.NoError(waitForAdded(e.queue, 0)) +} + +type sleepingJob struct { + duration time.Duration + id string +} + +func (s *sleepingJob) ID() string { + return s.duration.String() + s.id +} + +func (s *sleepingJob) Run(ctx context.Context) error { + ch := make(chan struct{}) + go func() { + time.Sleep(s.duration) + close(ch) + }() + + select { + case <-ch: + return nil + case <-ctx.Done(): + return nil + } + + return nil +} + +func (s *sleepingJob) AllowConcurrency() bool { + return false +} + +func (s *sleepingJob) Type() string { + return "sleeping-job" +} + +type failingJob struct { + id string +} + +func (f *failingJob) Type() string { + return "failing-job" +} + +func (f *failingJob) ID() string { + return f.id +} + +func (f *failingJob) Run(ctx context.Context) error { + return fmt.Errorf("failing job %s is intended to fail ", f.id) +} + +func (f *failingJob) AllowConcurrency() bool { + return false +} + +func waitForAdded(q workqueue.DelayingInterface, depth int) error { + err := wait.Poll(1*time.Millisecond, 1*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + return false, nil + }) + if err != nil { + fmt.Printf("%d != %d", q.Len(), depth) + } + return err +}