Skip to content

Commit

Permalink
Fix busyloop blocking goroutine (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Schrodi authored Feb 24, 2020
1 parent ad3de8f commit fc6033f
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .ci/test
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ cd "${SOURCE_PATH}"
# Install Ginkgo (test framework) to be able to execute the tests.
go install -mod=vendor ./vendor/github.com/onsi/ginkgo/ginkgo

ginkgo -cover -mod=vendor ./pkg/...
ginkgo -cover -mod=vendor -p --nodes=10 ./pkg/...
16 changes: 11 additions & 5 deletions cmd/testrunner/cmd/run_template/run_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package run_template

import (
"fmt"
"github.com/gardener/test-infra/pkg/shootflavors"
"github.com/gardener/test-infra/pkg/util/elasticsearch"
"os"
"time"
Expand Down Expand Up @@ -67,6 +68,8 @@ var runCmd = &cobra.Command{
var (
err error
stopCh = make(chan struct{})

shootFlavors []*shootflavors.ExtendedFlavorInstance
)
defer close(stopCh)
dryRun, _ := cmd.Flags().GetBool("dry-run")
Expand Down Expand Up @@ -94,13 +97,16 @@ var runCmd = &cobra.Command{
collectConfig.ESConfig = &esConfig
}

shootFlavors, err := GetShootFlavors(shootParameters.FlavorConfigPath, gardenK8sClient, shootPrefix, filterPatchVersions)
if err != nil {
logger.Log.Error(err, "unable to parse shoot flavors from test configuration")
os.Exit(1)
if shootParameters.FlavorConfigPath != "" {
flavors, err := GetShootFlavors(shootParameters.FlavorConfigPath, gardenK8sClient, shootPrefix, filterPatchVersions)
if err != nil {
logger.Log.Error(err, "unable to parse shoot flavors from test configuration")
os.Exit(1)
}
shootFlavors = flavors.GetShoots()
}

runs, err := testrunnerTemplate.RenderTestruns(logger.Log.WithName("Render"), &shootParameters, shootFlavors.GetShoots())
runs, err := testrunnerTemplate.RenderTestruns(logger.Log.WithName("Render"), &shootParameters, shootFlavors)
if err != nil {
logger.Log.Error(err, "unable to render testrun")
os.Exit(1)
Expand Down
35 changes: 21 additions & 14 deletions pkg/testmachinery/controller/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package watch
import (
"context"
"github.com/gardener/test-infra/pkg/apis/testmachinery/v1beta1"
"github.com/gardener/test-infra/pkg/util"
"github.com/go-logr/logr"
"github.com/hashicorp/go-multierror"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -64,15 +63,19 @@ func (w *watch) WatchUntil(timeout time.Duration, namespace, name string, f Watc
namespacedName := types.NamespacedName{Namespace: namespace, Name: name}

w.mux.Lock()
if _, ok := w.watches[namespacedName.String()]; !ok {
w.watches[namespacedName.String()] = make(chan *v1beta1.Testrun)
watchCh, ok := w.watches[namespacedName.String()]
if !ok {
watchCh = make(chan *v1beta1.Testrun)
w.watches[namespacedName.String()] = watchCh
}
w.mux.Unlock()

// remove the watch from the list of watches to not leak watching channels
defer w.remove(namespacedName.String())

var (
errs error
done, _ = w.get(namespacedName.String())
after <-chan time.Time
errs error
after <-chan time.Time
)

if timeout != 0 {
Expand All @@ -86,7 +89,7 @@ func (w *watch) WatchUntil(timeout time.Duration, namespace, name string, f Watc

for {
select {
case tr := <-done:
case tr := <-watchCh:
done, err := f(tr)
if err != nil {
if done {
Expand All @@ -113,8 +116,10 @@ func (w *watch) Reconcile(r reconcile.Request) (reconcile.Result, error) {

ch, ok := w.get(r.String())
if !ok {
w.log.V(10).Info("no watch found", "namespacedName", r.String())
return reconcile.Result{}, nil
}
w.log.V(8).Info("reconcile", "namespacedName", r.String())

tr := &v1beta1.Testrun{}
if err := w.client.Get(ctx, r.NamespacedName, tr); err != nil {
Expand All @@ -124,17 +129,19 @@ func (w *watch) Reconcile(r reconcile.Request) (reconcile.Result, error) {

select {
case ch <- tr:
default:
}
return reconcile.Result{}, nil
}

// close the channel if the testrun is completed
if util.Completed(tr.Status.Phase) {
close(ch)
// remove closes the watch channel and removes the watch from the list of watches
func (w *watch) remove(request string) {
if c, ok := w.get(request); ok {
close(c)
w.mux.Lock()
delete(w.watches, r.String())
w.mux.Unlock()
defer w.mux.Unlock()
delete(w.watches, request)
}

return reconcile.Result{}, nil
}

// get returns the watch for a reconcile request
Expand Down
4 changes: 2 additions & 2 deletions pkg/testrunner/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *executor) AddItem(f func()) {

// Run executes all added items in the configured order
func (e *executor) Run() {
var wg = util.AdvancedWaitGroup{}
var wg = &util.AdvancedWaitGroup{}

var i = 0
for e.len() != 0 {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (e *executor) Run() {
wg.Wait()
}
if e.len() == 0 {
e.waitForLastElement(&wg)
e.waitForLastElement(wg)
}
i++
}
Expand Down
73 changes: 60 additions & 13 deletions pkg/testrunner/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("Executor tests", func() {
executions[i] = e
f := func() {
e.start = time.Now()
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
}
executor.AddItem(f)
}
Expand All @@ -50,7 +50,7 @@ var _ = Describe("Executor tests", func() {
Expect(e.start.After(before.start)).To(BeTrue())

b := e.start.Sub(before.start)
Expect(b.Seconds()).To(BeNumerically("~", 1, 0.1))
Expect(b.Seconds()).To(BeNumerically("~", 5, 1))
}
}, 10)

Expand Down Expand Up @@ -85,7 +85,7 @@ var _ = Describe("Executor tests", func() {
executor, err := testrunner.NewExecutor(log.NullLogger{}, testrunner.ExecutorConfig{
Serial: true,
BackoffBucket: 1,
BackoffPeriod: 2 * time.Second,
BackoffPeriod: 5 * time.Second,
})
Expect(err).ToNot(HaveOccurred())

Expand All @@ -105,7 +105,54 @@ var _ = Describe("Executor tests", func() {
Expect(e.start.After(before.start)).To(BeTrue())

b := e.start.Sub(before.start)
Expect(b.Seconds()).To(BeNumerically("~", 2, 0.1))
Expect(b.Seconds()).To(BeNumerically("~", 5, 1))
}
}, 10)

It("should run 1 function in serial", func() {
executor, err := testrunner.NewExecutor(log.NullLogger{}, testrunner.ExecutorConfig{
Serial: true,
})
Expect(err).ToNot(HaveOccurred())

e := newExecution(1)
f := func() {
e.start = time.Now()
time.Sleep(5 * time.Second)
}
executor.AddItem(f)

executor.Run()
endtime := time.Now()
b := endtime.Sub(e.start)
Expect(b.Seconds()).To(BeNumerically("~", 5, 1))
}, 10)

It("should run 3 functions in serial", func() {
executions := [3]*execution{}
executor, err := testrunner.NewExecutor(log.NullLogger{}, testrunner.ExecutorConfig{
Serial: true,
})
Expect(err).ToNot(HaveOccurred())

for i := 0; i < 3; i++ {
e := newExecution(i)
executions[i] = e
f := func() {
e.start = time.Now()
time.Sleep(5 * time.Second)
}
executor.AddItem(f)
}

executor.Run()
for i := 1; i < 3; i++ {
e := executions[i]
before := executions[i-1]
Expect(e.start.After(before.start)).To(BeTrue())

b := e.start.Sub(before.start)
Expect(b.Seconds()).To(BeNumerically("~", 5, 1))
}
}, 10)

Expand Down Expand Up @@ -182,7 +229,7 @@ var _ = Describe("Executor tests", func() {
executions[i] = e
f := func() {
e.start = time.Now()
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
if e.value == 2 {
executor.AddItem(func() {
addExecution.start = time.Now()
Expand All @@ -195,7 +242,7 @@ var _ = Describe("Executor tests", func() {
executor.Run()

Expect(addExecution.start.IsZero()).To(BeFalse())
expectExecutionsToBe(addExecution, executions[2], 1)
expectExecutionsToBe(addExecution, executions[2], 5)

}, 10)

Expand All @@ -211,7 +258,7 @@ var _ = Describe("Executor tests", func() {
executions[i] = e
f := func() {
e.start = time.Now()
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
if e.value == 2 {
executor.AddItem(func() {
addExecution.start = time.Now()
Expand All @@ -224,7 +271,7 @@ var _ = Describe("Executor tests", func() {
executor.Run()

Expect(addExecution.start.IsZero()).To(BeFalse())
expectExecutionsToBe(addExecution, executions[2], 1)
expectExecutionsToBe(addExecution, executions[2], 5)
}, 10)

It("should add another test during execution in parallel steps that start immediately", func() {
Expand All @@ -241,7 +288,7 @@ var _ = Describe("Executor tests", func() {
e.start = time.Now()

if e.value == 1 {
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
executor.AddItem(func() {
addExecution.start = time.Now()
})
Expand All @@ -255,7 +302,7 @@ var _ = Describe("Executor tests", func() {
executor.Run()

Expect(addExecution.start.IsZero()).To(BeFalse())
expectExecutionsToBe(addExecution, executions[0], 1)
expectExecutionsToBe(addExecution, executions[0], 5)
}, 10)

It("should add same test during execution in parallel steps", func() {
Expand All @@ -269,7 +316,7 @@ var _ = Describe("Executor tests", func() {
var f func()
f = func() {
e.start = time.Now()
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
if e.value == 1 {
e.value = 3
executor.AddItem(f)
Expand All @@ -281,14 +328,14 @@ var _ = Describe("Executor tests", func() {
executor.Run()

Expect(executions[1].value).To(Equal(3))
expectExecutionsToBe(executions[1], executions[2], 1)
expectExecutionsToBe(executions[1], executions[2], 5)
}, 10)

})

func expectExecutionsToBe(e1, e2 *execution, expDurationSeconds int) {
d := e1.start.Sub(e2.start)
ExpectWithOffset(1, d.Seconds()).To(BeNumerically("~", expDurationSeconds, 0.1), "duration is %fs but expected %ds", d.Seconds(), expDurationSeconds)
ExpectWithOffset(1, d.Seconds()).To(BeNumerically("~", expDurationSeconds, 1.1), "duration is %fs but expected %ds", d.Seconds(), expDurationSeconds)
}

func newExecution(i int) *execution {
Expand Down
9 changes: 4 additions & 5 deletions pkg/testrunner/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,10 @@ func (rl RunList) RenderTable() string {

func triggerRunEvent(notifyChannels []chan *Run, run *Run) {
for _, c := range notifyChannels {
go func(c chan *Run) {
select {
case c <- run:
}
}(c)
select {
case c <- run:
default:
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/util/waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package util

import (
"sync"
"time"
)

// AdvancedWaitGroup implements the same interface as sync.WaitGroup.
Expand Down Expand Up @@ -45,11 +46,9 @@ func (wg *AdvancedWaitGroup) Done() {

// Wait waits until the counter is 0
func (wg *AdvancedWaitGroup) Wait() {
for {
if wg.count == 0 {
return
}
}
wg.WaitWithCancelFunc(func() bool {
return false
})
}

// WaitWithCancelFunc waits until the wait group count is zero or the cancel function returns true
Expand All @@ -58,6 +57,7 @@ func (wg *AdvancedWaitGroup) WaitWithCancelFunc(cancel func() bool) {
if wg.count == 0 || cancel() {
return
}
time.Sleep(1 * time.Second)
}
}

Expand Down

0 comments on commit fc6033f

Please sign in to comment.