From 8dea88bb03ed0fa705b8a6b6f5d4999245097e73 Mon Sep 17 00:00:00 2001 From: weizhi Date: Thu, 4 May 2023 04:20:25 +0800 Subject: [PATCH] fix: fail fast may cause Serial spec or cleanup Node interrupted (#1178) * fix: fail fast may cause Serial spec or cleanup Node interrupted * tighten up edges around abort behavior 1. inter-process aborts should not interrupt cleanup nodes 2. whenever we fetch interrupt status, check and see if an abort has happened. if it has ensure we return the latest, correct, abort state. this allows us to avoid accidentally starting the next spec because the ABORT_POLLING_INTERVAL hasn't fired yet * fix race condition in internal integration suite * fix internal test race condition --------- Co-authored-by: Onsi Fakhouri --- internal/internal_integration/abort_test.go | 78 ++++++++++++++++++- .../internal_integration_suite_test.go | 46 +++++++++++ .../interrupt_handler/interrupt_handler.go | 47 +++++++---- .../interrupt_handler_test.go | 7 +- .../interrupthandler_suite_test.go | 3 + internal/suite.go | 6 ++ internal/test_helpers/fake_reporter.go | 8 ++ 7 files changed, 176 insertions(+), 19 deletions(-) diff --git a/internal/internal_integration/abort_test.go b/internal/internal_integration/abort_test.go index 656b1e995..223b212e0 100644 --- a/internal/internal_integration/abort_test.go +++ b/internal/internal_integration/abort_test.go @@ -1,9 +1,12 @@ package internal_integration_test import ( + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/onsi/ginkgo/v2/internal/interrupt_handler" . "github.com/onsi/ginkgo/v2/internal/test_helpers" "github.com/onsi/ginkgo/v2/types" ) @@ -158,19 +161,92 @@ var _ = Describe("handling test aborts", func() { }) Describe("when running in parallel and a test aborts", func() { + var c chan interface{} BeforeEach(func() { SetUpForParallel(2) + c = make(chan interface{}) }) It("notifies the server of the abort", func() { Ω(client.ShouldAbort()).Should(BeFalse()) - success, _ := RunFixture("aborting in parallel", func() { + success := RunFixtureInParallel("aborting in parallel", func(_ int) { It("A", func() { + <-c Abort("abort") }) + + It("B", func(ctx SpecContext) { + close(c) + select { + case <-ctx.Done(): + rt.Run("dc-done") + case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2): + rt.Run("dc-after") + } + }) }) Ω(success).Should(BeFalse()) Ω(client.ShouldAbort()).Should(BeTrue()) + + Ω(rt).Should(HaveTracked("dc-done")) //not dc-after + Ω(reporter.Did.Find("A")).Should(HaveAborted("abort")) + Ω(reporter.Did.Find("B")).Should(HaveBeenInterrupted(interrupt_handler.InterruptCauseAbortByOtherProcess)) }) + + It("does not interrupt cleanup nodes", func() { + success := RunFixtureInParallel("aborting in parallel", func(_ int) { + It("A", func() { + <-c + Abort("abort") + }) + + Context("B", func() { + It("B", func() { + }) + + AfterEach(func(ctx SpecContext) { + close(c) + select { + case <-ctx.Done(): + rt.Run("dc-done") + case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2): + rt.Run("dc-after") + } + }) + }) + }) + Ω(success).Should(BeFalse()) + + Ω(rt).Should(HaveTracked("dc-after")) //not dc-done + Ω(reporter.Did.Find("A")).Should(HaveAborted("abort")) + Ω(reporter.Did.Find("B")).Should(HavePassed()) + }) + + It("does not start serial nodes if an abort occurs", func() { + success := RunFixtureInParallel("aborting in parallel", func(proc int) { + It("A", func() { + time.Sleep(time.Millisecond * 50) + if proc == 2 { + rt.Run("aborting") + Abort("abort") + } + }) + + It("B", func() { + time.Sleep(time.Millisecond * 50) + if proc == 2 { + rt.Run("aborting") + Abort("abort") + } + }) + + It("C", Serial, func() { + rt.Run("C") + }) + }) + Ω(success).Should(BeFalse()) + Ω(rt).Should(HaveTracked("aborting")) //just one aborting and we don't see C + }, MustPassRepeatedly(10)) + }) }) diff --git a/internal/internal_integration/internal_integration_suite_test.go b/internal/internal_integration/internal_integration_suite_test.go index 0966cf448..7264e2f3a 100644 --- a/internal/internal_integration/internal_integration_suite_test.go +++ b/internal/internal_integration/internal_integration_suite_test.go @@ -2,6 +2,7 @@ package internal_integration_test import ( "context" + "fmt" "io" "reflect" "testing" @@ -10,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2/internal" "github.com/onsi/ginkgo/v2/internal/global" + "github.com/onsi/ginkgo/v2/internal/interrupt_handler" "github.com/onsi/ginkgo/v2/internal/parallel_support" . "github.com/onsi/ginkgo/v2/internal/test_helpers" "github.com/onsi/ginkgo/v2/types" @@ -18,6 +20,7 @@ import ( ) func TestSuiteTests(t *testing.T) { + interrupt_handler.ABORT_POLLING_INTERVAL = 200 * time.Millisecond format.TruncatedDiff = false RegisterFailHandler(Fail) suiteConfig, _ := GinkgoConfiguration() @@ -81,6 +84,7 @@ func WithSuite(suite *internal.Suite, callback func()) { func SetUpForParallel(parallelTotal int) { conf.ParallelTotal = parallelTotal server, client, exitChannels = SetUpServerAndClient(conf.ParallelTotal) + conf.ParallelHost = server.Address() } @@ -95,6 +99,48 @@ func RunFixture(description string, callback func()) (bool, bool) { return success, hasProgrammaticFocus } +/* +You should call SetUpForParallel() first, then call RunFixtureInParallel() + +this is, at best, an approximation. There are some dsl objects that can be called within a running node (e.g. DeferCleanup) that will not work with RunFixtureInParallel as they will attach to the actual internal_integration suite as opposed to the simulated fixture + +moreover the FakeInterruptHandler is not used - instead a real interrupt handler is created and configured with the client generated by SetUpForParallel. this is to facilitate the testing of cross-process aborts, which was the primary motivator for this method. + +also a noopProgressSignalRegistrar is used to avoid an annoying data race +*/ +func RunFixtureInParallel(description string, callback func(proc int)) bool { + suites := make([]*internal.Suite, conf.ParallelTotal) + finished := make(chan bool, conf.ParallelTotal) + for proc := 1; proc <= conf.ParallelTotal; proc++ { + suites[proc-1] = internal.NewSuite() + WithSuite(suites[proc-1], func() { + callback(proc) + Ω(suites[proc-1].BuildTree()).Should(Succeed()) + }) + } + for proc := 1; proc <= conf.ParallelTotal; proc++ { + proc := proc + c := conf //make a copy + c.ParallelProcess = proc + exit := exitChannels[proc] + suite := suites[proc-1] + go func() { + interruptHandler := interrupt_handler.NewInterruptHandler(client) + defer interruptHandler.Stop() + + success, _ := suite.Run(fmt.Sprintf("%s - %d", description, proc), Label("TopLevelLabel"), "/path/to/suite", failer, reporter, writer, outputInterceptor, interruptHandler, client, noopProgressSignalRegistrar, c) + close(exit) + finished <- success + }() + } + success := true + for proc := 1; proc <= conf.ParallelTotal; proc++ { + success = (<-finished) && success + } + + return success +} + func F(options ...interface{}) { location := cl message := "fail" diff --git a/internal/interrupt_handler/interrupt_handler.go b/internal/interrupt_handler/interrupt_handler.go index ac6f51040..8ed86111f 100644 --- a/internal/interrupt_handler/interrupt_handler.go +++ b/internal/interrupt_handler/interrupt_handler.go @@ -10,7 +10,7 @@ import ( "github.com/onsi/ginkgo/v2/internal/parallel_support" ) -const ABORT_POLLING_INTERVAL = 500 * time.Millisecond +var ABORT_POLLING_INTERVAL = 500 * time.Millisecond type InterruptCause uint @@ -62,13 +62,14 @@ type InterruptHandlerInterface interface { } type InterruptHandler struct { - c chan interface{} - lock *sync.Mutex - level InterruptLevel - cause InterruptCause - client parallel_support.Client - stop chan interface{} - signals []os.Signal + c chan interface{} + lock *sync.Mutex + level InterruptLevel + cause InterruptCause + client parallel_support.Client + stop chan interface{} + signals []os.Signal + requestAbortCheck chan interface{} } func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) *InterruptHandler { @@ -76,11 +77,12 @@ func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) * signals = []os.Signal{os.Interrupt, syscall.SIGTERM} } handler := &InterruptHandler{ - c: make(chan interface{}), - lock: &sync.Mutex{}, - stop: make(chan interface{}), - client: client, - signals: signals, + c: make(chan interface{}), + lock: &sync.Mutex{}, + stop: make(chan interface{}), + requestAbortCheck: make(chan interface{}), + client: client, + signals: signals, } handler.registerForInterrupts() return handler @@ -109,6 +111,12 @@ func (handler *InterruptHandler) registerForInterrupts() { pollTicker.Stop() return } + case <-handler.requestAbortCheck: + if handler.client.ShouldAbort() { + close(abortChannel) + pollTicker.Stop() + return + } case <-handler.stop: pollTicker.Stop() return @@ -152,11 +160,18 @@ func (handler *InterruptHandler) registerForInterrupts() { func (handler *InterruptHandler) Status() InterruptStatus { handler.lock.Lock() - defer handler.lock.Unlock() - - return InterruptStatus{ + status := InterruptStatus{ Level: handler.level, Channel: handler.c, Cause: handler.cause, } + handler.lock.Unlock() + + if handler.client != nil && handler.client.ShouldAbort() && !status.Interrupted() { + close(handler.requestAbortCheck) + <-status.Channel + return handler.Status() + } + + return status } diff --git a/internal/interrupt_handler/interrupt_handler_test.go b/internal/interrupt_handler/interrupt_handler_test.go index d93f14b4f..6196440f3 100644 --- a/internal/interrupt_handler/interrupt_handler_test.go +++ b/internal/interrupt_handler/interrupt_handler_test.go @@ -96,8 +96,6 @@ var _ = Describe("InterruptHandler", func() { }) }) - // here - test that abort only triggers once - // here - test interplay with signal Describe("Interrupting when another Ginkgo process has aborted", func() { var client parallel_support.Client BeforeEach(func() { @@ -165,5 +163,10 @@ var _ = Describe("InterruptHandler", func() { Ω(status.Cause).Should(Equal(interrupt_handler.InterruptCauseSignal)) Ω(status.Level).Should(Equal(interrupt_handler.InterruptLevelCleanupAndReport)) }) + + It("doesn't just rely on the ABORT_POLLING_INTERVAL timer to report that the interrupt has happened", func() { + client.PostAbort() + Ω(interruptHandler.Status().Cause).Should(Equal(interrupt_handler.InterruptCauseAbortByOtherProcess)) + }, MustPassRepeatedly(10)) }) }) diff --git a/internal/interrupt_handler/interrupthandler_suite_test.go b/internal/interrupt_handler/interrupthandler_suite_test.go index 555c8bca7..3b539e70b 100644 --- a/internal/interrupt_handler/interrupthandler_suite_test.go +++ b/internal/interrupt_handler/interrupthandler_suite_test.go @@ -2,12 +2,15 @@ package interrupt_handler_test import ( "testing" + "time" . "github.com/onsi/ginkgo/v2" + "github.com/onsi/ginkgo/v2/internal/interrupt_handler" . "github.com/onsi/gomega" ) func TestInterrupthandler(t *testing.T) { + interrupt_handler.ABORT_POLLING_INTERVAL = 50 * time.Millisecond RegisterFailHandler(Fail) RunSpecs(t, "Interrupthandler Suite") } diff --git a/internal/suite.go b/internal/suite.go index a1dbd4c62..60c913d89 100644 --- a/internal/suite.go +++ b/internal/suite.go @@ -937,6 +937,12 @@ func (suite *Suite) runNode(node Node, specDeadline time.Time, text string) (typ gracePeriodChannel = time.After(gracePeriod) case <-interruptStatus.Channel: interruptStatus = suite.interruptHandler.Status() + // ignore interruption from other process if we are cleaning up or reporting + if interruptStatus.Cause == interrupt_handler.InterruptCauseAbortByOtherProcess && + node.NodeType.Is(types.NodeTypesAllowedDuringReportInterrupt|types.NodeTypesAllowedDuringCleanupInterrupt) { + continue + } + deadlineChannel = nil // don't worry about deadlines, time's up now failureTimelineLocation := suite.generateTimelineLocation() diff --git a/internal/test_helpers/fake_reporter.go b/internal/test_helpers/fake_reporter.go index f6d4b71bd..59a3145e6 100644 --- a/internal/test_helpers/fake_reporter.go +++ b/internal/test_helpers/fake_reporter.go @@ -111,18 +111,26 @@ func NewFakeReporter() *FakeReporter { } func (r *FakeReporter) SuiteWillBegin(report types.Report) { + r.lock.Lock() + defer r.lock.Unlock() r.Begin = report } func (r *FakeReporter) WillRun(report types.SpecReport) { + r.lock.Lock() + defer r.lock.Unlock() r.Will = append(r.Will, report) } func (r *FakeReporter) DidRun(report types.SpecReport) { + r.lock.Lock() + defer r.lock.Unlock() r.Did = append(r.Did, report) } func (r *FakeReporter) SuiteDidEnd(report types.Report) { + r.lock.Lock() + defer r.lock.Unlock() r.End = report } func (r *FakeReporter) EmitProgressReport(progressReport types.ProgressReport) {