Skip to content

Commit

Permalink
fix: fail fast may cause Serial spec or cleanup Node interrupted (#1178)
Browse files Browse the repository at this point in the history
* fix: fail fast may cause Serial spec or cleanup Node interrupted

* tighten up edges around abort behavior

1. inter-process aborts should not interrupt cleanup nodes
2. whenever we fetch interrupt status, check and see if an abort has happened.  if it has ensure we return the latest, correct, abort state.  this allows us to avoid accidentally starting the next spec  because the ABORT_POLLING_INTERVAL hasn't fired yet

* fix race condition in internal integration suite

* fix internal test race condition

---------

Co-authored-by: Onsi Fakhouri <onsijoe@gmail.com>
  • Loading branch information
cvvz and onsi committed May 3, 2023
1 parent 903be81 commit 8dea88b
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 19 deletions.
78 changes: 77 additions & 1 deletion internal/internal_integration/abort_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package internal_integration_test

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
"github.com/onsi/ginkgo/v2/types"
)
Expand Down Expand Up @@ -158,19 +161,92 @@ var _ = Describe("handling test aborts", func() {
})

Describe("when running in parallel and a test aborts", func() {
var c chan interface{}
BeforeEach(func() {
SetUpForParallel(2)
c = make(chan interface{})
})

It("notifies the server of the abort", func() {
Ω(client.ShouldAbort()).Should(BeFalse())
success, _ := RunFixture("aborting in parallel", func() {
success := RunFixtureInParallel("aborting in parallel", func(_ int) {
It("A", func() {
<-c
Abort("abort")
})

It("B", func(ctx SpecContext) {
close(c)
select {
case <-ctx.Done():
rt.Run("dc-done")
case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2):
rt.Run("dc-after")
}
})
})
Ω(success).Should(BeFalse())
Ω(client.ShouldAbort()).Should(BeTrue())

Ω(rt).Should(HaveTracked("dc-done")) //not dc-after
Ω(reporter.Did.Find("A")).Should(HaveAborted("abort"))
Ω(reporter.Did.Find("B")).Should(HaveBeenInterrupted(interrupt_handler.InterruptCauseAbortByOtherProcess))
})

It("does not interrupt cleanup nodes", func() {
success := RunFixtureInParallel("aborting in parallel", func(_ int) {
It("A", func() {
<-c
Abort("abort")
})

Context("B", func() {
It("B", func() {
})

AfterEach(func(ctx SpecContext) {
close(c)
select {
case <-ctx.Done():
rt.Run("dc-done")
case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2):
rt.Run("dc-after")
}
})
})
})
Ω(success).Should(BeFalse())

Ω(rt).Should(HaveTracked("dc-after")) //not dc-done
Ω(reporter.Did.Find("A")).Should(HaveAborted("abort"))
Ω(reporter.Did.Find("B")).Should(HavePassed())
})

It("does not start serial nodes if an abort occurs", func() {
success := RunFixtureInParallel("aborting in parallel", func(proc int) {
It("A", func() {
time.Sleep(time.Millisecond * 50)
if proc == 2 {
rt.Run("aborting")
Abort("abort")
}
})

It("B", func() {
time.Sleep(time.Millisecond * 50)
if proc == 2 {
rt.Run("aborting")
Abort("abort")
}
})

It("C", Serial, func() {
rt.Run("C")
})
})
Ω(success).Should(BeFalse())
Ω(rt).Should(HaveTracked("aborting")) //just one aborting and we don't see C
}, MustPassRepeatedly(10))

})
})
46 changes: 46 additions & 0 deletions internal/internal_integration/internal_integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal_integration_test

import (
"context"
"fmt"
"io"
"reflect"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
. "github.com/onsi/ginkgo/v2"
"github.com/onsi/ginkgo/v2/internal"
"github.com/onsi/ginkgo/v2/internal/global"
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
"github.com/onsi/ginkgo/v2/internal/parallel_support"
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
"github.com/onsi/ginkgo/v2/types"
Expand All @@ -18,6 +20,7 @@ import (
)

func TestSuiteTests(t *testing.T) {
interrupt_handler.ABORT_POLLING_INTERVAL = 200 * time.Millisecond
format.TruncatedDiff = false
RegisterFailHandler(Fail)
suiteConfig, _ := GinkgoConfiguration()
Expand Down Expand Up @@ -81,6 +84,7 @@ func WithSuite(suite *internal.Suite, callback func()) {
func SetUpForParallel(parallelTotal int) {
conf.ParallelTotal = parallelTotal
server, client, exitChannels = SetUpServerAndClient(conf.ParallelTotal)

conf.ParallelHost = server.Address()
}

Expand All @@ -95,6 +99,48 @@ func RunFixture(description string, callback func()) (bool, bool) {
return success, hasProgrammaticFocus
}

/*
You should call SetUpForParallel() first, then call RunFixtureInParallel()
this is, at best, an approximation. There are some dsl objects that can be called within a running node (e.g. DeferCleanup) that will not work with RunFixtureInParallel as they will attach to the actual internal_integration suite as opposed to the simulated fixture
moreover the FakeInterruptHandler is not used - instead a real interrupt handler is created and configured with the client generated by SetUpForParallel. this is to facilitate the testing of cross-process aborts, which was the primary motivator for this method.
also a noopProgressSignalRegistrar is used to avoid an annoying data race
*/
func RunFixtureInParallel(description string, callback func(proc int)) bool {
suites := make([]*internal.Suite, conf.ParallelTotal)
finished := make(chan bool, conf.ParallelTotal)
for proc := 1; proc <= conf.ParallelTotal; proc++ {
suites[proc-1] = internal.NewSuite()
WithSuite(suites[proc-1], func() {
callback(proc)
Ω(suites[proc-1].BuildTree()).Should(Succeed())
})
}
for proc := 1; proc <= conf.ParallelTotal; proc++ {
proc := proc
c := conf //make a copy
c.ParallelProcess = proc
exit := exitChannels[proc]
suite := suites[proc-1]
go func() {
interruptHandler := interrupt_handler.NewInterruptHandler(client)
defer interruptHandler.Stop()

success, _ := suite.Run(fmt.Sprintf("%s - %d", description, proc), Label("TopLevelLabel"), "/path/to/suite", failer, reporter, writer, outputInterceptor, interruptHandler, client, noopProgressSignalRegistrar, c)
close(exit)
finished <- success
}()
}
success := true
for proc := 1; proc <= conf.ParallelTotal; proc++ {
success = (<-finished) && success
}

return success
}

func F(options ...interface{}) {
location := cl
message := "fail"
Expand Down
47 changes: 31 additions & 16 deletions internal/interrupt_handler/interrupt_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/onsi/ginkgo/v2/internal/parallel_support"
)

const ABORT_POLLING_INTERVAL = 500 * time.Millisecond
var ABORT_POLLING_INTERVAL = 500 * time.Millisecond

type InterruptCause uint

Expand Down Expand Up @@ -62,25 +62,27 @@ type InterruptHandlerInterface interface {
}

type InterruptHandler struct {
c chan interface{}
lock *sync.Mutex
level InterruptLevel
cause InterruptCause
client parallel_support.Client
stop chan interface{}
signals []os.Signal
c chan interface{}
lock *sync.Mutex
level InterruptLevel
cause InterruptCause
client parallel_support.Client
stop chan interface{}
signals []os.Signal
requestAbortCheck chan interface{}
}

func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) *InterruptHandler {
if len(signals) == 0 {
signals = []os.Signal{os.Interrupt, syscall.SIGTERM}
}
handler := &InterruptHandler{
c: make(chan interface{}),
lock: &sync.Mutex{},
stop: make(chan interface{}),
client: client,
signals: signals,
c: make(chan interface{}),
lock: &sync.Mutex{},
stop: make(chan interface{}),
requestAbortCheck: make(chan interface{}),
client: client,
signals: signals,
}
handler.registerForInterrupts()
return handler
Expand Down Expand Up @@ -109,6 +111,12 @@ func (handler *InterruptHandler) registerForInterrupts() {
pollTicker.Stop()
return
}
case <-handler.requestAbortCheck:
if handler.client.ShouldAbort() {
close(abortChannel)
pollTicker.Stop()
return
}
case <-handler.stop:
pollTicker.Stop()
return
Expand Down Expand Up @@ -152,11 +160,18 @@ func (handler *InterruptHandler) registerForInterrupts() {

func (handler *InterruptHandler) Status() InterruptStatus {
handler.lock.Lock()
defer handler.lock.Unlock()

return InterruptStatus{
status := InterruptStatus{
Level: handler.level,
Channel: handler.c,
Cause: handler.cause,
}
handler.lock.Unlock()

if handler.client != nil && handler.client.ShouldAbort() && !status.Interrupted() {
close(handler.requestAbortCheck)
<-status.Channel
return handler.Status()
}

return status
}
7 changes: 5 additions & 2 deletions internal/interrupt_handler/interrupt_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ var _ = Describe("InterruptHandler", func() {
})
})

// here - test that abort only triggers once
// here - test interplay with signal
Describe("Interrupting when another Ginkgo process has aborted", func() {
var client parallel_support.Client
BeforeEach(func() {
Expand Down Expand Up @@ -165,5 +163,10 @@ var _ = Describe("InterruptHandler", func() {
Ω(status.Cause).Should(Equal(interrupt_handler.InterruptCauseSignal))
Ω(status.Level).Should(Equal(interrupt_handler.InterruptLevelCleanupAndReport))
})

It("doesn't just rely on the ABORT_POLLING_INTERVAL timer to report that the interrupt has happened", func() {
client.PostAbort()
Ω(interruptHandler.Status().Cause).Should(Equal(interrupt_handler.InterruptCauseAbortByOtherProcess))
}, MustPassRepeatedly(10))
})
})
3 changes: 3 additions & 0 deletions internal/interrupt_handler/interrupthandler_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package interrupt_handler_test

import (
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
. "github.com/onsi/gomega"
)

func TestInterrupthandler(t *testing.T) {
interrupt_handler.ABORT_POLLING_INTERVAL = 50 * time.Millisecond
RegisterFailHandler(Fail)
RunSpecs(t, "Interrupthandler Suite")
}
6 changes: 6 additions & 0 deletions internal/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,12 @@ func (suite *Suite) runNode(node Node, specDeadline time.Time, text string) (typ
gracePeriodChannel = time.After(gracePeriod)
case <-interruptStatus.Channel:
interruptStatus = suite.interruptHandler.Status()
// ignore interruption from other process if we are cleaning up or reporting
if interruptStatus.Cause == interrupt_handler.InterruptCauseAbortByOtherProcess &&
node.NodeType.Is(types.NodeTypesAllowedDuringReportInterrupt|types.NodeTypesAllowedDuringCleanupInterrupt) {
continue
}

deadlineChannel = nil // don't worry about deadlines, time's up now

failureTimelineLocation := suite.generateTimelineLocation()
Expand Down
8 changes: 8 additions & 0 deletions internal/test_helpers/fake_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,26 @@ func NewFakeReporter() *FakeReporter {
}

func (r *FakeReporter) SuiteWillBegin(report types.Report) {
r.lock.Lock()
defer r.lock.Unlock()
r.Begin = report
}

func (r *FakeReporter) WillRun(report types.SpecReport) {
r.lock.Lock()
defer r.lock.Unlock()
r.Will = append(r.Will, report)
}

func (r *FakeReporter) DidRun(report types.SpecReport) {
r.lock.Lock()
defer r.lock.Unlock()
r.Did = append(r.Did, report)
}

func (r *FakeReporter) SuiteDidEnd(report types.Report) {
r.lock.Lock()
defer r.lock.Unlock()
r.End = report
}
func (r *FakeReporter) EmitProgressReport(progressReport types.ProgressReport) {
Expand Down

0 comments on commit 8dea88b

Please sign in to comment.