From defe4c09c8bfee2e8198d19316bed0197704626a Mon Sep 17 00:00:00 2001 From: Charlie Chiang Date: Mon, 13 Mar 2023 11:48:25 +0800 Subject: [PATCH] Test: add tests for workqueue and config parser (#36) * Test: add tests for workqueue and config parser Signed-off-by: Charlie Chiang * reviewable Signed-off-by: Charlie Chiang * fix tests Signed-off-by: Charlie Chiang * add boilerplate Signed-off-by: Charlie Chiang * add vendors to boilerplate exclusion Signed-off-by: Charlie Chiang --------- Signed-off-by: Charlie Chiang --- .github/workflows/unit-test.yaml | 7 + Makefile | 1 + go.mod | 3 +- go.sum | 2 - hack/boilerplate/boilerplate.py | 13 +- pkg/config/parser_test.go | 53 +++ pkg/config/testdata/golden/conf.cue | 20 + pkg/config/testdata/golden/conf.json | 28 ++ pkg/config/testdata/golden/conf.yaml | 16 + pkg/config/testdata/golden/conf.yml | 16 + pkg/config/testdata/invalidext/conf.invalid | 0 pkg/config/testdata/invalidschema/conf.yml | 27 ++ pkg/util/test_utils.go | 46 --- pkg/workqueue/default_rate_limiters_test.go | 231 +++++++++++ pkg/workqueue/delaying_queue_test.go | 250 ++++++++++++ .../main_test.go} | 26 +- pkg/workqueue/metrics_test.go | 278 ++++++++++++++ pkg/workqueue/parallelizer_test.go | 111 ++++++ pkg/workqueue/queue_test.go | 363 ++++++++++++++++++ pkg/workqueue/rate_limiting_queue_test.go | 75 ++++ 20 files changed, 1495 insertions(+), 71 deletions(-) create mode 100644 pkg/config/parser_test.go create mode 100644 pkg/config/testdata/golden/conf.cue create mode 100644 pkg/config/testdata/golden/conf.json create mode 100644 pkg/config/testdata/golden/conf.yaml create mode 100644 pkg/config/testdata/golden/conf.yml create mode 100644 pkg/config/testdata/invalidext/conf.invalid create mode 100644 pkg/config/testdata/invalidschema/conf.yml delete mode 100644 pkg/util/test_utils.go create mode 100644 pkg/workqueue/default_rate_limiters_test.go create mode 100644 pkg/workqueue/delaying_queue_test.go rename pkg/{util/mapstructure/mapstructure.go => workqueue/main_test.go} (52%) create mode 100644 pkg/workqueue/metrics_test.go create mode 100644 pkg/workqueue/parallelizer_test.go create mode 100644 pkg/workqueue/queue_test.go create mode 100644 pkg/workqueue/rate_limiting_queue_test.go diff --git a/.github/workflows/unit-test.yaml b/.github/workflows/unit-test.yaml index 9e8b180..4c1ed0f 100644 --- a/.github/workflows/unit-test.yaml +++ b/.github/workflows/unit-test.yaml @@ -61,5 +61,12 @@ jobs: run: | go install -mod=mod github.com/onsi/ginkgo/v2/ginkgo + - name: Prepare minikube + # For whatever reason, certain unit tests relies on k8s. + # Although such reliance should be removed, in the meantime, + # we set up a local k8s cluster to run such tests. + # Use minikube simply because it comes with GitHub runner. + run: minikube start + - name: Run tests run: make test \ No newline at end of file diff --git a/Makefile b/Makefile index e51030d..1677888 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,7 @@ svgformat: # Check possible issues before committing code reviewable: generate checklicense lint + go mod tidy # Run tests test: envtest diff --git a/go.mod b/go.mod index abd6380..53c4d13 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( cloud.google.com/go v0.102.0 cuelang.org/go v0.5.0-beta.2.0.20230130095913-d573e0c2f041 github.com/crossplane/crossplane-runtime v0.14.1-0.20210722005935-0b469fcc77cd + github.com/google/go-cmp v0.5.9 github.com/kubevela/pkg v0.0.0-20230307020100-8537f7bfc02b github.com/mitchellh/hashstructure/v2 v2.0.2 - github.com/mitchellh/mapstructure v1.5.0 github.com/oam-dev/kubevela-core-api v1.6.0-alpha.1 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 @@ -58,7 +58,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect - github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect diff --git a/go.sum b/go.sum index 3d78279..8f190fc 100644 --- a/go.sum +++ b/go.sum @@ -985,8 +985,6 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= diff --git a/hack/boilerplate/boilerplate.py b/hack/boilerplate/boilerplate.py index b46891d..1a4df27 100755 --- a/hack/boilerplate/boilerplate.py +++ b/hack/boilerplate/boilerplate.py @@ -146,17 +146,22 @@ def file_extension(filename): return os.path.splitext(filename)[1].split(".")[-1].lower() skipped_dirs = [ - '.git', - "pkg/util/workqueue", + ".git", + ".go/gocache", + ".go/gomodcache", + ".idea", + ".vscode", + "default.etcd", "docs", "build", - "bin" + "bin", + "pkg/workqueue", + "vendors", ] # list all the files contain 'DO NOT EDIT', but are not generated skipped_ungenerated_files = [ 'hack/boilerplate/boilerplate.py', - "hack/generate-go-const-from-file.sh" ] def normalize_files(files): diff --git a/pkg/config/parser_test.go b/pkg/config/parser_test.go new file mode 100644 index 0000000..6d79a6a --- /dev/null +++ b/pkg/config/parser_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestNewFromFileOrDir(t *testing.T) { + a := assert.New(t) + // From file + cf, err := NewFromFileOrDir("testdata/golden/conf.cue") + a.NoError(err) + a.Equal(1, len(cf.Triggers)) + // From dir + cd, err := NewFromFileOrDir("testdata/golden") + a.NoError(err) + a.Equal(4, len(cd.Triggers)) + // Every trigger should be the same + prev := cf.Triggers[0] + for _, t := range cd.Triggers { + reflect.DeepEqual(prev, t) + } +} + +func TestNewFromFileOrDirInvalid(t *testing.T) { + a := assert.New(t) + + // Report error if file is invalid + _, err := NewFromFileOrDir("testdata/invalidext/conf.invalid") + a.Error(err) + + // No error if files in dir is invalid, just skips + c, err := NewFromFileOrDir("testdata/invalidext") + a.NoError(err) + a.Equal(0, len(c.Triggers)) +} diff --git a/pkg/config/testdata/golden/conf.cue b/pkg/config/testdata/golden/conf.cue new file mode 100644 index 0000000..a6ba6bc --- /dev/null +++ b/pkg/config/testdata/golden/conf.cue @@ -0,0 +1,20 @@ +triggers: [{ + source: { + type: "resource-watcher" + properties: { + clusters: [ + "cn-shanghai", + ] + apiVersion: "apps/v1" + kind: "Deployment" + events: [ + "update", + ] + } + } + filter: "context.data.status.readyReplicas == context.data.status.replicas" + action: { + type: "sae-record-event" + properties: nameSelector: fromLabel: "workflowrun.oam.dev/name" + } +}] diff --git a/pkg/config/testdata/golden/conf.json b/pkg/config/testdata/golden/conf.json new file mode 100644 index 0000000..df30ec1 --- /dev/null +++ b/pkg/config/testdata/golden/conf.json @@ -0,0 +1,28 @@ +{ + "triggers": [ + { + "source": { + "type": "resource-watcher", + "properties": { + "clusters": [ + "cn-shanghai" + ], + "apiVersion": "apps/v1", + "kind": "Deployment", + "events": [ + "update" + ] + } + }, + "filter": "context.data.status.readyReplicas == context.data.status.replicas", + "action": { + "type": "sae-record-event", + "properties": { + "nameSelector": { + "fromLabel": "workflowrun.oam.dev/name" + } + } + } + } + ] +} diff --git a/pkg/config/testdata/golden/conf.yaml b/pkg/config/testdata/golden/conf.yaml new file mode 100644 index 0000000..d2e2f80 --- /dev/null +++ b/pkg/config/testdata/golden/conf.yaml @@ -0,0 +1,16 @@ +triggers: + - source: + type: resource-watcher + properties: + clusters: + - "cn-shanghai" + apiVersion: apps/v1 + kind: Deployment + events: + - update + filter: context.data.status.readyReplicas == context.data.status.replicas + action: + type: sae-record-event + properties: + nameSelector: + fromLabel: "workflowrun.oam.dev/name" diff --git a/pkg/config/testdata/golden/conf.yml b/pkg/config/testdata/golden/conf.yml new file mode 100644 index 0000000..d2e2f80 --- /dev/null +++ b/pkg/config/testdata/golden/conf.yml @@ -0,0 +1,16 @@ +triggers: + - source: + type: resource-watcher + properties: + clusters: + - "cn-shanghai" + apiVersion: apps/v1 + kind: Deployment + events: + - update + filter: context.data.status.readyReplicas == context.data.status.replicas + action: + type: sae-record-event + properties: + nameSelector: + fromLabel: "workflowrun.oam.dev/name" diff --git a/pkg/config/testdata/invalidext/conf.invalid b/pkg/config/testdata/invalidext/conf.invalid new file mode 100644 index 0000000..e69de29 diff --git a/pkg/config/testdata/invalidschema/conf.yml b/pkg/config/testdata/invalidschema/conf.yml new file mode 100644 index 0000000..02c9ab7 --- /dev/null +++ b/pkg/config/testdata/invalidschema/conf.yml @@ -0,0 +1,27 @@ +triggers: + - sources: + type: resource-watcher + properties: + apiVersion: core.oam.dev/v1alpha1 + kind: WorkflowRun + events: + - create + matchingLabels: + trigger.oam.dev/watch: "true" + action: + type: create-event-listener + - source: + type: resource-watcher + properties: + clusters: + - "cn-shanghai" + apiVersion: apps/v1 + kind: Deployment + events: + - update + filter: context.data.status.readyReplicas == context.data.status.replicas + action: + type: sae-record-event + properties: + nameSelector: + fromLabel: "workflowrun.oam.dev/name" diff --git a/pkg/util/test_utils.go b/pkg/util/test_utils.go deleted file mode 100644 index 70e0b04..0000000 --- a/pkg/util/test_utils.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2022 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "context" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -// IgnoreAlreadyExists ignores error of already exists -func IgnoreAlreadyExists(err error) error { - if apierrors.IsAlreadyExists(err) { - return nil - } - return err -} - -// IgnoreNotFound ignores error of not found -func IgnoreNotFound(err error) error { - return client.IgnoreNotFound(err) -} - -// ReconcileOnce will just reconcile once. -func ReconcileOnce(r reconcile.Reconciler, req reconcile.Request) error { - if _, err := r.Reconcile(context.TODO(), req); err != nil { - return err - } - return nil -} diff --git a/pkg/workqueue/default_rate_limiters_test.go b/pkg/workqueue/default_rate_limiters_test.go new file mode 100644 index 0000000..3776ed4 --- /dev/null +++ b/pkg/workqueue/default_rate_limiters_test.go @@ -0,0 +1,231 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" +) + +func TestItemExponentialFailureRateLimiter(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) + + if e, a := 1*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 4*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 8*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 16*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second) + for i := 0; i < 5; i++ { + limiter.When("one") + } + if e, a := 32*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + for i := 0; i < 1000; i++ { + limiter.When("overflow1") + } + if e, a := 1000*time.Second, limiter.When("overflow1"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter = NewItemExponentialFailureRateLimiter(1*time.Minute, 1000*time.Hour) + for i := 0; i < 2; i++ { + limiter.When("two") + } + if e, a := 4*time.Minute, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + for i := 0; i < 1000; i++ { + limiter.When("overflow2") + } + if e, a := 1000*time.Hour, limiter.When("overflow2"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestItemFastSlowRateLimiter(t *testing.T) { + limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3) + + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestMaxOfRateLimiter(t *testing.T) { + limiter := NewMaxOfRateLimiter( + NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3), + NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second), + ) + + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 3*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 3*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestWithMaxWaitRateLimiter(t *testing.T) { + limiter := NewWithMaxWaitRateLimiter(NewStepRateLimiter(5*time.Millisecond, 1000*time.Second, 100), 500*time.Second) + for i := 0; i < 100; i++ { + if e, a := 5*time.Millisecond, limiter.When(i); e != a { + t.Errorf("expected %v, got %v ", e, a) + } + } + + for i := 100; i < 200; i++ { + if e, a := 500*time.Second, limiter.When(i); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } +} + +var _ RateLimiter = &StepRateLimiter{} + +func NewStepRateLimiter(baseDelay time.Duration, maxDelay time.Duration, threshold int) RateLimiter { + return &StepRateLimiter{ + baseDelay: baseDelay, + maxDelay: maxDelay, + threshold: threshold, + } +} + +type StepRateLimiter struct { + count int + threshold int + baseDelay time.Duration + maxDelay time.Duration +} + +func (r *StepRateLimiter) When(item interface{}) time.Duration { + r.count += 1 + if r.count <= r.threshold { + return r.baseDelay + } + return r.maxDelay +} + +func (r *StepRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +func (r *StepRateLimiter) Forget(item interface{}) { +} diff --git a/pkg/workqueue/delaying_queue_test.go b/pkg/workqueue/delaying_queue_test.go new file mode 100644 index 0000000..e08b494 --- /dev/null +++ b/pkg/workqueue/delaying_queue_test.go @@ -0,0 +1,250 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "fmt" + "math/rand" + "reflect" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + testingclock "k8s.io/utils/clock/testing" +) + +func TestSimpleQueue(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + q := NewDelayingQueueWithCustomClock(fakeClock, "") + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 1 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the next heartbeat + fakeClock.Step(10 * time.Second) + + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() > 0 { + return false, fmt.Errorf("added to queue") + } + + return false, nil + }) + if err != wait.ErrWaitTimeout { + t.Errorf("expected timeout, got: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + +func TestDeduping(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + q := NewDelayingQueueWithCustomClock(fakeClock, "") + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + q.AddAfter(first, 70*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if q.Len() != 1 { + t.Errorf("should not have added") + } + + // step past the first block, we should receive now + fakeClock.Step(60 * time.Millisecond) + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the second add + fakeClock.Step(20 * time.Millisecond) + if q.Len() != 0 { + t.Errorf("should not have added") + } + + // test again, but this time the earlier should override + q.AddAfter(first, 50*time.Millisecond) + q.AddAfter(first, 30*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if q.Len() != 1 { + t.Errorf("should not have added") + } + + fakeClock.Step(40 * time.Millisecond) + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ = q.Get() + q.Done(item) + + // step past the second add + fakeClock.Step(20 * time.Millisecond) + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + +func TestAddTwoFireEarly(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + q := NewDelayingQueueWithCustomClock(fakeClock, "") + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 2 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + if err := waitForAdded(q, 2); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ := q.Get() + if !reflect.DeepEqual(item, second) { + t.Errorf("expected %v, got %v", second, item) + } + + q.AddAfter(third, 2*time.Second) + + fakeClock.Step(1 * time.Second) + if err := waitForAdded(q, 2); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ = q.Get() + if !reflect.DeepEqual(item, first) { + t.Errorf("expected %v, got %v", first, item) + } + + fakeClock.Step(2 * time.Second) + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ = q.Get() + if !reflect.DeepEqual(item, third) { + t.Errorf("expected %v, got %v", third, item) + } +} + +func TestCopyShifting(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + q := NewDelayingQueueWithCustomClock(fakeClock, "") + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 500*time.Millisecond) + q.AddAfter(third, 250*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 3 { + t.Errorf("should not have added") + } + + fakeClock.Step(2 * time.Second) + + if err := waitForAdded(q, 3); err != nil { + t.Fatalf("unexpected err: %v", err) + } + actualFirst, _ := q.Get() + if !reflect.DeepEqual(actualFirst, third) { + t.Errorf("expected %v, got %v", third, actualFirst) + } + actualSecond, _ := q.Get() + if !reflect.DeepEqual(actualSecond, second) { + t.Errorf("expected %v, got %v", second, actualSecond) + } + actualThird, _ := q.Get() + if !reflect.DeepEqual(actualThird, first) { + t.Errorf("expected %v, got %v", first, actualThird) + } +} + +func BenchmarkDelayingQueue_AddAfter(b *testing.B) { + fakeClock := testingclock.NewFakeClock(time.Now()) + q := NewDelayingQueueWithCustomClock(fakeClock, "") + + // Add items + for n := 0; n < b.N; n++ { + data := fmt.Sprintf("%d", n) + q.AddAfter(data, time.Duration(rand.Int63n(int64(10*time.Minute)))) + } + + // Exercise item removal as well + fakeClock.Step(11 * time.Minute) + for n := 0; n < b.N; n++ { + _, _ = q.Get() + } +} + +func waitForAdded(q DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + + return false, nil + }) +} + +func waitForWaitingQueueToFill(q DelayingInterface) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if len(q.(*delayingType).waitingForAddCh) == 0 { + return true, nil + } + + return false, nil + }) +} diff --git a/pkg/util/mapstructure/mapstructure.go b/pkg/workqueue/main_test.go similarity index 52% rename from pkg/util/mapstructure/mapstructure.go rename to pkg/workqueue/main_test.go index e083550..41274b2 100644 --- a/pkg/util/mapstructure/mapstructure.go +++ b/pkg/workqueue/main_test.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeVela Authors. +Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,24 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package mapstructure +package workqueue import ( - "encoding/json" - - "github.com/mitchellh/mapstructure" + "math/rand" + "os" + "testing" + "time" ) -// Decode unmarshalls a map[string]interface{} into a structure. -func Decode(in map[string]interface{}, out interface{}) error { - return mapstructure.Decode(in, out) -} - -// Encode marshals a structure into a map[string]interface{}. -func Encode(in interface{}, out *map[string]interface{}) error { - b, err := json.Marshal(in) - if err != nil { - return err - } - return json.Unmarshal(b, out) +func TestMain(m *testing.M) { + rand.Seed(time.Now().UnixNano()) + os.Exit(m.Run()) } diff --git a/pkg/workqueue/metrics_test.go b/pkg/workqueue/metrics_test.go new file mode 100644 index 0000000..bfc9c1a --- /dev/null +++ b/pkg/workqueue/metrics_test.go @@ -0,0 +1,278 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + "testing" + "time" + + testingclock "k8s.io/utils/clock/testing" +) + +type testMetrics struct { + added, gotten, finished int64 + + updateCalled chan<- struct{} +} + +func (m *testMetrics) add(item t) { m.added++ } +func (m *testMetrics) get(item t) { m.gotten++ } +func (m *testMetrics) done(item t) { m.finished++ } +func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } + +func TestMetricShutdown(t *testing.T) { + ch := make(chan struct{}) + m := &testMetrics{ + updateCalled: ch, + } + c := testingclock.NewFakeClock(time.Now()) + q := newQueue(c, m, time.Millisecond) + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + c.Step(time.Millisecond) + <-ch + q.ShutDown() + + c.Step(time.Hour) + select { + default: + return + case <-ch: + t.Errorf("Unexpected update after shutdown was called.") + } +} + +type testMetric struct { + inc int64 + dec int64 + set float64 + + observedValue float64 + observedCount int + + notifyCh chan<- struct{} + + lock sync.Mutex +} + +func (m *testMetric) Inc() { + m.lock.Lock() + defer m.lock.Unlock() + m.inc++ + m.notify() +} + +func (m *testMetric) Dec() { + m.lock.Lock() + defer m.lock.Unlock() + m.dec++ + m.notify() +} + +func (m *testMetric) Set(f float64) { + m.lock.Lock() + defer m.lock.Unlock() + m.set = f + m.notify() +} + +func (m *testMetric) Observe(f float64) { + m.lock.Lock() + defer m.lock.Unlock() + m.observedValue = f + m.observedCount++ + m.notify() +} + +func (m *testMetric) gaugeValue() float64 { + m.lock.Lock() + defer m.lock.Unlock() + if m.set != 0 { + return m.set + } + return float64(m.inc - m.dec) +} + +func (m *testMetric) observationValue() float64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.observedValue +} + +func (m *testMetric) observationCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.observedCount +} + +func (m *testMetric) notify() { + if m.notifyCh != nil { + m.notifyCh <- struct{}{} + } +} + +type testMetricsProvider struct { + depth testMetric + adds testMetric + latency testMetric + duration testMetric + unfinished testMetric + longest testMetric + retries testMetric +} + +func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { + return &m.depth +} + +func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric { + return &m.adds +} + +func (m *testMetricsProvider) NewLatencyMetric(name string) HistogramMetric { + return &m.latency +} + +func (m *testMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric { + return &m.duration +} + +func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { + return &m.unfinished +} + +func (m *testMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric { + return &m.longest +} + +func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { + return &m.retries +} + +func TestMetrics(t *testing.T) { + mp := testMetricsProvider{} + t0 := time.Unix(0, 0) + c := testingclock.NewFakeClock(t0) + mf := queueMetricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + q := newQueue(c, m, time.Millisecond) + defer q.ShutDown() + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + q.Add("foo") + if e, a := 1.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(50 * time.Microsecond) + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 5e-05, mp.latency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.latency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Add it back while processing; multiple adds of the same item are + // de-duped. + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + if e, a := 2.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(25 * time.Microsecond) + + // Finish it up + q.Done(i) + + if e, a := 2.5e-05, mp.duration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.duration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 2.5e-05, mp.latency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.latency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // use a channel to ensure we don't look at the metric before it's + // been set. + ch := make(chan struct{}, 1) + longestCh := make(chan struct{}, 1) + mp.unfinished.notifyCh = ch + mp.longest.notifyCh = longestCh + c.Step(time.Millisecond) + <-ch + mp.unfinished.notifyCh = nil + if e, a := .001, mp.unfinished.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + <-longestCh + mp.longest.notifyCh = nil + if e, a := .001, mp.longest.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Finish that one up + q.Done(i) + if e, a := .001, mp.duration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.duration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/pkg/workqueue/parallelizer_test.go b/pkg/workqueue/parallelizer_test.go new file mode 100644 index 0000000..2443834 --- /dev/null +++ b/pkg/workqueue/parallelizer_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + + "github.com/google/go-cmp/cmp" +) + +type testCase struct { + pieces int + workers int + chunkSize int +} + +func (c testCase) String() string { + return fmt.Sprintf("pieces:%d,workers:%d,chunkSize:%d", c.pieces, c.workers, c.chunkSize) +} + +var cases = []testCase{ + { + pieces: 1000, + workers: 10, + chunkSize: 1, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 10, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 100, + }, + { + pieces: 999, + workers: 10, + chunkSize: 13, + }, +} + +func TestParallelizeUntil(t *testing.T) { + for _, tc := range cases { + t.Run(tc.String(), func(t *testing.T) { + seen := make([]int32, tc.pieces) + ctx := context.Background() + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + atomic.AddInt32(&seen[p], 1) + }, WithChunkSize(tc.chunkSize)) + + wantSeen := make([]int32, tc.pieces) + for i := 0; i < tc.pieces; i++ { + wantSeen[i] = 1 + } + if diff := cmp.Diff(wantSeen, seen); diff != "" { + t.Errorf("bad number of visits (-want,+got):\n%s", diff) + } + }) + } +} + +func BenchmarkParallelizeUntil(b *testing.B) { + for _, tc := range cases { + b.Run(tc.String(), func(b *testing.B) { + ctx := context.Background() + isPrime := make([]bool, tc.pieces) + b.ResetTimer() + for c := 0; c < b.N; c++ { + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + isPrime[p] = calPrime(p) + }, WithChunkSize(tc.chunkSize)) + } + b.StopTimer() + want := []bool{false, false, true, true, false, true, false, true, false, false, false, true} + if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" { + b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff) + } + }) + } +} + +func calPrime(p int) bool { + if p <= 1 { + return false + } + for i := 2; i*i <= p; i++ { + if p%i == 0 { + return false + } + } + return true +} diff --git a/pkg/workqueue/queue_test.go b/pkg/workqueue/queue_test.go new file mode 100644 index 0000000..de78203 --- /dev/null +++ b/pkg/workqueue/queue_test.go @@ -0,0 +1,363 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue_test + +import ( + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" +) + +func TestBasic(t *testing.T) { + tests := []struct { + queue *workqueue.Type + queueShutDown func(workqueue.Interface) + }{ + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDown, + }, + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDownWithDrain, + }, + } + for _, test := range tests { + // If something is seriously wrong this test will never complete. + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + for j := 0; j < 50; j++ { + test.queue.Add(i) + time.Sleep(time.Millisecond) + } + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + for { + item, quit := test.queue.Get() + if item == "added after shutdown!" { + t.Errorf("Got an item added after shutdown.") + } + if quit { + return + } + t.Logf("Worker %v: begin processing %v", i, item) + time.Sleep(3 * time.Millisecond) + t.Logf("Worker %v: done processing %v", i, item) + test.queue.Done(item) + } + }(i) + } + + producerWG.Wait() + test.queueShutDown(test.queue) + test.queue.Add("added after shutdown!") + consumerWG.Wait() + if test.queue.Len() != 0 { + t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len()) + } + } +} + +func TestAddWhileProcessing(t *testing.T) { + tests := []struct { + queue *workqueue.Type + queueShutDown func(workqueue.Interface) + }{ + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDown, + }, + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDownWithDrain, + }, + } + for _, test := range tests { + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + test.queue.Add(i) + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + // Every worker will re-add every item up to two times. + // This tests the dirty-while-processing case. + counters := map[interface{}]int{} + for { + item, quit := test.queue.Get() + if quit { + return + } + counters[item]++ + if counters[item] < 2 { + test.queue.Add(item) + } + test.queue.Done(item) + } + }(i) + } + + producerWG.Wait() + test.queueShutDown(test.queue) + consumerWG.Wait() + if test.queue.Len() != 0 { + t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len()) + } + } +} + +func TestLen(t *testing.T) { + q := workqueue.New() + q.Add("foo") + if e, a := 1, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("bar") + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("foo") // should not increase the queue length. + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} + +func TestReinsert(t *testing.T) { + q := workqueue.New() + q.Add("foo") + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Add it back while processing + q.Add(i) + + // Finish it up + q.Done(i) + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Finish that one up + q.Done(i) + + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} + +func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) { + + q := workqueue.New() + + q.Add("foo") + q.Add("bar") + + firstItem, _ := q.Get() + secondItem, _ := q.Get() + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + q.ShutDownWithDrain() + }() + + // This is done as to simulate a sequence of events where ShutDownWithDrain + // is called before we start marking all items as done - thus simulating a + // drain where we wait for all items to finish processing. + shuttingDown := false + for !shuttingDown { + _, shuttingDown = q.Get() + } + + // Mark the first two items as done, as to finish up + q.Done(firstItem) + q.Done(secondItem) + + finishedWG.Wait() +} + +func TestNoQueueDrainageUsingShutDown(t *testing.T) { + + q := workqueue.New() + + q.Add("foo") + q.Add("bar") + + q.Get() + q.Get() + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + // Invoke ShutDown: suspending the execution immediately. + q.ShutDown() + }() + + // We can now do this and not have the test timeout because we didn't call + // Done on the first two items before arriving here. + finishedWG.Wait() +} + +func TestForceQueueShutdownUsingShutDown(t *testing.T) { + + q := workqueue.New() + + q.Add("foo") + q.Add("bar") + + q.Get() + q.Get() + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + q.ShutDownWithDrain() + }() + + // This is done as to simulate a sequence of events where ShutDownWithDrain + // is called before ShutDown + shuttingDown := false + for !shuttingDown { + _, shuttingDown = q.Get() + } + + // Use ShutDown to force the queue to shut down (simulating a caller + // which can invoke this function on a second SIGTERM/SIGINT) + q.ShutDown() + + // We can now do this and not have the test timeout because we didn't call + // done on any of the items before arriving here. + finishedWG.Wait() +} + +func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) { + q := workqueue.New() + + q.Add("foo") + gotten, _ := q.Get() + q.Add("foo") + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + q.ShutDownWithDrain() + }() + + // Ensure that ShutDownWithDrain has started and is blocked. + shuttingDown := false + for !shuttingDown { + _, shuttingDown = q.Get() + } + + // Finish "working". + q.Done(gotten) + + // `shuttingDown` becomes false because Done caused an item to go back into + // the queue. + again, shuttingDown := q.Get() + if shuttingDown { + t.Fatalf("should not have been done") + } + q.Done(again) + + // Now we are really done. + _, shuttingDown = q.Get() + if !shuttingDown { + t.Fatalf("should have been done") + } + + finishedWG.Wait() +} + +// TestGarbageCollection ensures that objects that are added then removed from the queue are +// able to be garbage collected. +func TestGarbageCollection(t *testing.T) { + type bigObject struct { + data []byte + } + leakQueue := workqueue.New() + t.Cleanup(func() { + // Make sure leakQueue doesn't go out of scope too early + runtime.KeepAlive(leakQueue) + }) + c := &bigObject{data: []byte("hello")} + mustGarbageCollect(t, c) + leakQueue.Add(c) + o, _ := leakQueue.Get() + leakQueue.Done(o) +} + +// mustGarbageCollect asserts than an object was garbage collected by the end of the test. +// The input must be a pointer to an object. +func mustGarbageCollect(t *testing.T, i interface{}) { + t.Helper() + var collected int32 = 0 + runtime.SetFinalizer(i, func(x interface{}) { + atomic.StoreInt32(&collected, 1) + }) + t.Cleanup(func() { + if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) { + // Trigger GC explicitly, otherwise we may need to wait a long time for it to run + runtime.GC() + return atomic.LoadInt32(&collected) == 1, nil + }); err != nil { + t.Errorf("object was not garbage collected") + } + }) +} diff --git a/pkg/workqueue/rate_limiting_queue_test.go b/pkg/workqueue/rate_limiting_queue_test.go new file mode 100644 index 0000000..84d8495 --- /dev/null +++ b/pkg/workqueue/rate_limiting_queue_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" + + testingclock "k8s.io/utils/clock/testing" +) + +func TestRateLimitingQueue(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) + queue := NewRateLimitingQueue(limiter).(*rateLimitingType) + fakeClock := testingclock.NewFakeClock(time.Now()) + delayingQueue := &delayingType{ + Interface: New(), + clock: fakeClock, + heartbeat: fakeClock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(""), + } + queue.DelayingInterface = delayingQueue + + queue.AddRateLimited("one") + waitEntry := <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("one") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, queue.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + queue.AddRateLimited("two") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("two") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + queue.Forget("one") + if e, a := 0, queue.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("one") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +}