diff --git a/internal/internal_integration/abort_test.go b/internal/internal_integration/abort_test.go index 656b1e995..223b212e0 100644 --- a/internal/internal_integration/abort_test.go +++ b/internal/internal_integration/abort_test.go @@ -1,9 +1,12 @@ package internal_integration_test import ( + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/onsi/ginkgo/v2/internal/interrupt_handler" . "github.com/onsi/ginkgo/v2/internal/test_helpers" "github.com/onsi/ginkgo/v2/types" ) @@ -158,19 +161,92 @@ var _ = Describe("handling test aborts", func() { }) Describe("when running in parallel and a test aborts", func() { + var c chan interface{} BeforeEach(func() { SetUpForParallel(2) + c = make(chan interface{}) }) It("notifies the server of the abort", func() { Ω(client.ShouldAbort()).Should(BeFalse()) - success, _ := RunFixture("aborting in parallel", func() { + success := RunFixtureInParallel("aborting in parallel", func(_ int) { It("A", func() { + <-c Abort("abort") }) + + It("B", func(ctx SpecContext) { + close(c) + select { + case <-ctx.Done(): + rt.Run("dc-done") + case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2): + rt.Run("dc-after") + } + }) }) Ω(success).Should(BeFalse()) Ω(client.ShouldAbort()).Should(BeTrue()) + + Ω(rt).Should(HaveTracked("dc-done")) //not dc-after + Ω(reporter.Did.Find("A")).Should(HaveAborted("abort")) + Ω(reporter.Did.Find("B")).Should(HaveBeenInterrupted(interrupt_handler.InterruptCauseAbortByOtherProcess)) }) + + It("does not interrupt cleanup nodes", func() { + success := RunFixtureInParallel("aborting in parallel", func(_ int) { + It("A", func() { + <-c + Abort("abort") + }) + + Context("B", func() { + It("B", func() { + }) + + AfterEach(func(ctx SpecContext) { + close(c) + select { + case <-ctx.Done(): + rt.Run("dc-done") + case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2): + rt.Run("dc-after") + } + }) + }) + }) + Ω(success).Should(BeFalse()) + + Ω(rt).Should(HaveTracked("dc-after")) //not dc-done + Ω(reporter.Did.Find("A")).Should(HaveAborted("abort")) + Ω(reporter.Did.Find("B")).Should(HavePassed()) + }) + + It("does not start serial nodes if an abort occurs", func() { + success := RunFixtureInParallel("aborting in parallel", func(proc int) { + It("A", func() { + time.Sleep(time.Millisecond * 50) + if proc == 2 { + rt.Run("aborting") + Abort("abort") + } + }) + + It("B", func() { + time.Sleep(time.Millisecond * 50) + if proc == 2 { + rt.Run("aborting") + Abort("abort") + } + }) + + It("C", Serial, func() { + rt.Run("C") + }) + }) + Ω(success).Should(BeFalse()) + Ω(rt).Should(HaveTracked("aborting")) //just one aborting and we don't see C + }, MustPassRepeatedly(10)) + }) }) diff --git a/internal/internal_integration/internal_integration_suite_test.go b/internal/internal_integration/internal_integration_suite_test.go index 0966cf448..7264e2f3a 100644 --- a/internal/internal_integration/internal_integration_suite_test.go +++ b/internal/internal_integration/internal_integration_suite_test.go @@ -2,6 +2,7 @@ package internal_integration_test import ( "context" + "fmt" "io" "reflect" "testing" @@ -10,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2/internal" "github.com/onsi/ginkgo/v2/internal/global" + "github.com/onsi/ginkgo/v2/internal/interrupt_handler" "github.com/onsi/ginkgo/v2/internal/parallel_support" . "github.com/onsi/ginkgo/v2/internal/test_helpers" "github.com/onsi/ginkgo/v2/types" @@ -18,6 +20,7 @@ import ( ) func TestSuiteTests(t *testing.T) { + interrupt_handler.ABORT_POLLING_INTERVAL = 200 * time.Millisecond format.TruncatedDiff = false RegisterFailHandler(Fail) suiteConfig, _ := GinkgoConfiguration() @@ -81,6 +84,7 @@ func WithSuite(suite *internal.Suite, callback func()) { func SetUpForParallel(parallelTotal int) { conf.ParallelTotal = parallelTotal server, client, exitChannels = SetUpServerAndClient(conf.ParallelTotal) + conf.ParallelHost = server.Address() } @@ -95,6 +99,48 @@ func RunFixture(description string, callback func()) (bool, bool) { return success, hasProgrammaticFocus } +/* +You should call SetUpForParallel() first, then call RunFixtureInParallel() + +this is, at best, an approximation. There are some dsl objects that can be called within a running node (e.g. DeferCleanup) that will not work with RunFixtureInParallel as they will attach to the actual internal_integration suite as opposed to the simulated fixture + +moreover the FakeInterruptHandler is not used - instead a real interrupt handler is created and configured with the client generated by SetUpForParallel. this is to facilitate the testing of cross-process aborts, which was the primary motivator for this method. + +also a noopProgressSignalRegistrar is used to avoid an annoying data race +*/ +func RunFixtureInParallel(description string, callback func(proc int)) bool { + suites := make([]*internal.Suite, conf.ParallelTotal) + finished := make(chan bool, conf.ParallelTotal) + for proc := 1; proc <= conf.ParallelTotal; proc++ { + suites[proc-1] = internal.NewSuite() + WithSuite(suites[proc-1], func() { + callback(proc) + Ω(suites[proc-1].BuildTree()).Should(Succeed()) + }) + } + for proc := 1; proc <= conf.ParallelTotal; proc++ { + proc := proc + c := conf //make a copy + c.ParallelProcess = proc + exit := exitChannels[proc] + suite := suites[proc-1] + go func() { + interruptHandler := interrupt_handler.NewInterruptHandler(client) + defer interruptHandler.Stop() + + success, _ := suite.Run(fmt.Sprintf("%s - %d", description, proc), Label("TopLevelLabel"), "/path/to/suite", failer, reporter, writer, outputInterceptor, interruptHandler, client, noopProgressSignalRegistrar, c) + close(exit) + finished <- success + }() + } + success := true + for proc := 1; proc <= conf.ParallelTotal; proc++ { + success = (<-finished) && success + } + + return success +} + func F(options ...interface{}) { location := cl message := "fail" diff --git a/internal/interrupt_handler/interrupt_handler.go b/internal/interrupt_handler/interrupt_handler.go index ac6f51040..8ed86111f 100644 --- a/internal/interrupt_handler/interrupt_handler.go +++ b/internal/interrupt_handler/interrupt_handler.go @@ -10,7 +10,7 @@ import ( "github.com/onsi/ginkgo/v2/internal/parallel_support" ) -const ABORT_POLLING_INTERVAL = 500 * time.Millisecond +var ABORT_POLLING_INTERVAL = 500 * time.Millisecond type InterruptCause uint @@ -62,13 +62,14 @@ type InterruptHandlerInterface interface { } type InterruptHandler struct { - c chan interface{} - lock *sync.Mutex - level InterruptLevel - cause InterruptCause - client parallel_support.Client - stop chan interface{} - signals []os.Signal + c chan interface{} + lock *sync.Mutex + level InterruptLevel + cause InterruptCause + client parallel_support.Client + stop chan interface{} + signals []os.Signal + requestAbortCheck chan interface{} } func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) *InterruptHandler { @@ -76,11 +77,12 @@ func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) * signals = []os.Signal{os.Interrupt, syscall.SIGTERM} } handler := &InterruptHandler{ - c: make(chan interface{}), - lock: &sync.Mutex{}, - stop: make(chan interface{}), - client: client, - signals: signals, + c: make(chan interface{}), + lock: &sync.Mutex{}, + stop: make(chan interface{}), + requestAbortCheck: make(chan interface{}), + client: client, + signals: signals, } handler.registerForInterrupts() return handler @@ -109,6 +111,12 @@ func (handler *InterruptHandler) registerForInterrupts() { pollTicker.Stop() return } + case <-handler.requestAbortCheck: + if handler.client.ShouldAbort() { + close(abortChannel) + pollTicker.Stop() + return + } case <-handler.stop: pollTicker.Stop() return @@ -152,11 +160,18 @@ func (handler *InterruptHandler) registerForInterrupts() { func (handler *InterruptHandler) Status() InterruptStatus { handler.lock.Lock() - defer handler.lock.Unlock() - - return InterruptStatus{ + status := InterruptStatus{ Level: handler.level, Channel: handler.c, Cause: handler.cause, } + handler.lock.Unlock() + + if handler.client != nil && handler.client.ShouldAbort() && !status.Interrupted() { + close(handler.requestAbortCheck) + <-status.Channel + return handler.Status() + } + + return status } diff --git a/internal/interrupt_handler/interrupt_handler_test.go b/internal/interrupt_handler/interrupt_handler_test.go index d93f14b4f..6196440f3 100644 --- a/internal/interrupt_handler/interrupt_handler_test.go +++ b/internal/interrupt_handler/interrupt_handler_test.go @@ -96,8 +96,6 @@ var _ = Describe("InterruptHandler", func() { }) }) - // here - test that abort only triggers once - // here - test interplay with signal Describe("Interrupting when another Ginkgo process has aborted", func() { var client parallel_support.Client BeforeEach(func() { @@ -165,5 +163,10 @@ var _ = Describe("InterruptHandler", func() { Ω(status.Cause).Should(Equal(interrupt_handler.InterruptCauseSignal)) Ω(status.Level).Should(Equal(interrupt_handler.InterruptLevelCleanupAndReport)) }) + + It("doesn't just rely on the ABORT_POLLING_INTERVAL timer to report that the interrupt has happened", func() { + client.PostAbort() + Ω(interruptHandler.Status().Cause).Should(Equal(interrupt_handler.InterruptCauseAbortByOtherProcess)) + }, MustPassRepeatedly(10)) }) }) diff --git a/internal/interrupt_handler/interrupthandler_suite_test.go b/internal/interrupt_handler/interrupthandler_suite_test.go index 555c8bca7..3b539e70b 100644 --- a/internal/interrupt_handler/interrupthandler_suite_test.go +++ b/internal/interrupt_handler/interrupthandler_suite_test.go @@ -2,12 +2,15 @@ package interrupt_handler_test import ( "testing" + "time" . "github.com/onsi/ginkgo/v2" + "github.com/onsi/ginkgo/v2/internal/interrupt_handler" . "github.com/onsi/gomega" ) func TestInterrupthandler(t *testing.T) { + interrupt_handler.ABORT_POLLING_INTERVAL = 50 * time.Millisecond RegisterFailHandler(Fail) RunSpecs(t, "Interrupthandler Suite") } diff --git a/internal/suite.go b/internal/suite.go index a1dbd4c62..60c913d89 100644 --- a/internal/suite.go +++ b/internal/suite.go @@ -937,6 +937,12 @@ func (suite *Suite) runNode(node Node, specDeadline time.Time, text string) (typ gracePeriodChannel = time.After(gracePeriod) case <-interruptStatus.Channel: interruptStatus = suite.interruptHandler.Status() + // ignore interruption from other process if we are cleaning up or reporting + if interruptStatus.Cause == interrupt_handler.InterruptCauseAbortByOtherProcess && + node.NodeType.Is(types.NodeTypesAllowedDuringReportInterrupt|types.NodeTypesAllowedDuringCleanupInterrupt) { + continue + } + deadlineChannel = nil // don't worry about deadlines, time's up now failureTimelineLocation := suite.generateTimelineLocation() diff --git a/internal/test_helpers/fake_reporter.go b/internal/test_helpers/fake_reporter.go index f6d4b71bd..59a3145e6 100644 --- a/internal/test_helpers/fake_reporter.go +++ b/internal/test_helpers/fake_reporter.go @@ -111,18 +111,26 @@ func NewFakeReporter() *FakeReporter { } func (r *FakeReporter) SuiteWillBegin(report types.Report) { + r.lock.Lock() + defer r.lock.Unlock() r.Begin = report } func (r *FakeReporter) WillRun(report types.SpecReport) { + r.lock.Lock() + defer r.lock.Unlock() r.Will = append(r.Will, report) } func (r *FakeReporter) DidRun(report types.SpecReport) { + r.lock.Lock() + defer r.lock.Unlock() r.Did = append(r.Did, report) } func (r *FakeReporter) SuiteDidEnd(report types.Report) { + r.lock.Lock() + defer r.lock.Unlock() r.End = report } func (r *FakeReporter) EmitProgressReport(progressReport types.ProgressReport) {