Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fail fast may cause Serial spec or cleanup Node interrupted #1178

Merged
merged 4 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion internal/internal_integration/abort_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package internal_integration_test

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
"github.com/onsi/ginkgo/v2/types"
)
Expand Down Expand Up @@ -158,19 +161,92 @@ var _ = Describe("handling test aborts", func() {
})

Describe("when running in parallel and a test aborts", func() {
var c chan interface{}
BeforeEach(func() {
SetUpForParallel(2)
c = make(chan interface{})
})

It("notifies the server of the abort", func() {
Ω(client.ShouldAbort()).Should(BeFalse())
success, _ := RunFixture("aborting in parallel", func() {
success := RunFixtureInParallel("aborting in parallel", func(_ int) {
It("A", func() {
<-c
Abort("abort")
})

It("B", func(ctx SpecContext) {
close(c)
select {
case <-ctx.Done():
rt.Run("dc-done")
case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2):
rt.Run("dc-after")
}
})
})
Ω(success).Should(BeFalse())
Ω(client.ShouldAbort()).Should(BeTrue())

Ω(rt).Should(HaveTracked("dc-done")) //not dc-after
Ω(reporter.Did.Find("A")).Should(HaveAborted("abort"))
Ω(reporter.Did.Find("B")).Should(HaveBeenInterrupted(interrupt_handler.InterruptCauseAbortByOtherProcess))
})

It("does not interrupt cleanup nodes", func() {
success := RunFixtureInParallel("aborting in parallel", func(_ int) {
It("A", func() {
<-c
Abort("abort")
})

Context("B", func() {
It("B", func() {
})

AfterEach(func(ctx SpecContext) {
close(c)
select {
case <-ctx.Done():
rt.Run("dc-done")
case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2):
rt.Run("dc-after")
}
})
})
})
Ω(success).Should(BeFalse())

Ω(rt).Should(HaveTracked("dc-after")) //not dc-done
Ω(reporter.Did.Find("A")).Should(HaveAborted("abort"))
Ω(reporter.Did.Find("B")).Should(HavePassed())
})

It("does not start serial nodes if an abort occurs", func() {
success := RunFixtureInParallel("aborting in parallel", func(proc int) {
It("A", func() {
time.Sleep(time.Millisecond * 50)
if proc == 2 {
rt.Run("aborting")
Abort("abort")
}
})

It("B", func() {
time.Sleep(time.Millisecond * 50)
if proc == 2 {
rt.Run("aborting")
Abort("abort")
}
})

It("C", Serial, func() {
rt.Run("C")
})
})
Ω(success).Should(BeFalse())
Ω(rt).Should(HaveTracked("aborting")) //just one aborting and we don't see C
}, MustPassRepeatedly(10))

})
})
46 changes: 46 additions & 0 deletions internal/internal_integration/internal_integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal_integration_test

import (
"context"
"fmt"
"io"
"reflect"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
. "github.com/onsi/ginkgo/v2"
"github.com/onsi/ginkgo/v2/internal"
"github.com/onsi/ginkgo/v2/internal/global"
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
"github.com/onsi/ginkgo/v2/internal/parallel_support"
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
"github.com/onsi/ginkgo/v2/types"
Expand All @@ -18,6 +20,7 @@ import (
)

func TestSuiteTests(t *testing.T) {
interrupt_handler.ABORT_POLLING_INTERVAL = 200 * time.Millisecond
format.TruncatedDiff = false
RegisterFailHandler(Fail)
suiteConfig, _ := GinkgoConfiguration()
Expand Down Expand Up @@ -81,6 +84,7 @@ func WithSuite(suite *internal.Suite, callback func()) {
func SetUpForParallel(parallelTotal int) {
conf.ParallelTotal = parallelTotal
server, client, exitChannels = SetUpServerAndClient(conf.ParallelTotal)

conf.ParallelHost = server.Address()
}

Expand All @@ -95,6 +99,48 @@ func RunFixture(description string, callback func()) (bool, bool) {
return success, hasProgrammaticFocus
}

/*
You should call SetUpForParallel() first, then call RunFixtureInParallel()

this is, at best, an approximation. There are some dsl objects that can be called within a running node (e.g. DeferCleanup) that will not work with RunFixtureInParallel as they will attach to the actual internal_integration suite as opposed to the simulated fixture

moreover the FakeInterruptHandler is not used - instead a real interrupt handler is created and configured with the client generated by SetUpForParallel. this is to facilitate the testing of cross-process aborts, which was the primary motivator for this method.

also a noopProgressSignalRegistrar is used to avoid an annoying data race
*/
func RunFixtureInParallel(description string, callback func(proc int)) bool {
suites := make([]*internal.Suite, conf.ParallelTotal)
finished := make(chan bool, conf.ParallelTotal)
for proc := 1; proc <= conf.ParallelTotal; proc++ {
suites[proc-1] = internal.NewSuite()
WithSuite(suites[proc-1], func() {
callback(proc)
Ω(suites[proc-1].BuildTree()).Should(Succeed())
})
}
for proc := 1; proc <= conf.ParallelTotal; proc++ {
proc := proc
c := conf //make a copy
c.ParallelProcess = proc
exit := exitChannels[proc]
suite := suites[proc-1]
go func() {
interruptHandler := interrupt_handler.NewInterruptHandler(client)
defer interruptHandler.Stop()

success, _ := suite.Run(fmt.Sprintf("%s - %d", description, proc), Label("TopLevelLabel"), "/path/to/suite", failer, reporter, writer, outputInterceptor, interruptHandler, client, noopProgressSignalRegistrar, c)
close(exit)
finished <- success
}()
}
success := true
for proc := 1; proc <= conf.ParallelTotal; proc++ {
success = (<-finished) && success
}

return success
}

func F(options ...interface{}) {
location := cl
message := "fail"
Expand Down
47 changes: 31 additions & 16 deletions internal/interrupt_handler/interrupt_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/onsi/ginkgo/v2/internal/parallel_support"
)

const ABORT_POLLING_INTERVAL = 500 * time.Millisecond
var ABORT_POLLING_INTERVAL = 500 * time.Millisecond

type InterruptCause uint

Expand Down Expand Up @@ -62,25 +62,27 @@ type InterruptHandlerInterface interface {
}

type InterruptHandler struct {
c chan interface{}
lock *sync.Mutex
level InterruptLevel
cause InterruptCause
client parallel_support.Client
stop chan interface{}
signals []os.Signal
c chan interface{}
lock *sync.Mutex
level InterruptLevel
cause InterruptCause
client parallel_support.Client
stop chan interface{}
signals []os.Signal
requestAbortCheck chan interface{}
}

func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) *InterruptHandler {
if len(signals) == 0 {
signals = []os.Signal{os.Interrupt, syscall.SIGTERM}
}
handler := &InterruptHandler{
c: make(chan interface{}),
lock: &sync.Mutex{},
stop: make(chan interface{}),
client: client,
signals: signals,
c: make(chan interface{}),
lock: &sync.Mutex{},
stop: make(chan interface{}),
requestAbortCheck: make(chan interface{}),
client: client,
signals: signals,
}
handler.registerForInterrupts()
return handler
Expand Down Expand Up @@ -109,6 +111,12 @@ func (handler *InterruptHandler) registerForInterrupts() {
pollTicker.Stop()
return
}
case <-handler.requestAbortCheck:
if handler.client.ShouldAbort() {
close(abortChannel)
pollTicker.Stop()
return
}
case <-handler.stop:
pollTicker.Stop()
return
Expand Down Expand Up @@ -152,11 +160,18 @@ func (handler *InterruptHandler) registerForInterrupts() {

func (handler *InterruptHandler) Status() InterruptStatus {
handler.lock.Lock()
defer handler.lock.Unlock()

return InterruptStatus{
status := InterruptStatus{
Level: handler.level,
Channel: handler.c,
Cause: handler.cause,
}
handler.lock.Unlock()

if handler.client != nil && handler.client.ShouldAbort() && !status.Interrupted() {
close(handler.requestAbortCheck)
<-status.Channel
return handler.Status()
}

return status
}
7 changes: 5 additions & 2 deletions internal/interrupt_handler/interrupt_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ var _ = Describe("InterruptHandler", func() {
})
})

// here - test that abort only triggers once
// here - test interplay with signal
Describe("Interrupting when another Ginkgo process has aborted", func() {
var client parallel_support.Client
BeforeEach(func() {
Expand Down Expand Up @@ -165,5 +163,10 @@ var _ = Describe("InterruptHandler", func() {
Ω(status.Cause).Should(Equal(interrupt_handler.InterruptCauseSignal))
Ω(status.Level).Should(Equal(interrupt_handler.InterruptLevelCleanupAndReport))
})

It("doesn't just rely on the ABORT_POLLING_INTERVAL timer to report that the interrupt has happened", func() {
client.PostAbort()
Ω(interruptHandler.Status().Cause).Should(Equal(interrupt_handler.InterruptCauseAbortByOtherProcess))
}, MustPassRepeatedly(10))
})
})
3 changes: 3 additions & 0 deletions internal/interrupt_handler/interrupthandler_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package interrupt_handler_test

import (
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
. "github.com/onsi/gomega"
)

func TestInterrupthandler(t *testing.T) {
interrupt_handler.ABORT_POLLING_INTERVAL = 50 * time.Millisecond
RegisterFailHandler(Fail)
RunSpecs(t, "Interrupthandler Suite")
}
6 changes: 6 additions & 0 deletions internal/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,12 @@ func (suite *Suite) runNode(node Node, specDeadline time.Time, text string) (typ
gracePeriodChannel = time.After(gracePeriod)
case <-interruptStatus.Channel:
interruptStatus = suite.interruptHandler.Status()
// ignore interruption from other process if we are cleaning up or reporting
if interruptStatus.Cause == interrupt_handler.InterruptCauseAbortByOtherProcess &&
node.NodeType.Is(types.NodeTypesAllowedDuringReportInterrupt|types.NodeTypesAllowedDuringCleanupInterrupt) {
continue
}

deadlineChannel = nil // don't worry about deadlines, time's up now

failureTimelineLocation := suite.generateTimelineLocation()
Expand Down
8 changes: 8 additions & 0 deletions internal/test_helpers/fake_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,26 @@ func NewFakeReporter() *FakeReporter {
}

func (r *FakeReporter) SuiteWillBegin(report types.Report) {
r.lock.Lock()
defer r.lock.Unlock()
r.Begin = report
}

func (r *FakeReporter) WillRun(report types.SpecReport) {
r.lock.Lock()
defer r.lock.Unlock()
r.Will = append(r.Will, report)
}

func (r *FakeReporter) DidRun(report types.SpecReport) {
r.lock.Lock()
defer r.lock.Unlock()
r.Did = append(r.Did, report)
}

func (r *FakeReporter) SuiteDidEnd(report types.Report) {
r.lock.Lock()
defer r.lock.Unlock()
r.End = report
}
func (r *FakeReporter) EmitProgressReport(progressReport types.ProgressReport) {
Expand Down