From 67012d74caac40ed2fbebe485fbdefef8bda0d46 Mon Sep 17 00:00:00 2001 From: Dmitry Spikhalskiy Date: Mon, 27 Jun 2022 15:53:31 -0400 Subject: [PATCH] Fix test server publishing the Completion of child workflows before the Start sometimes Issue #1288 --- .../common/reporter/TestStatsReporter.java | 21 +++++++------- .../activityTests/ActivityTimeoutTest.java | 6 ++-- .../internal/testservice/StateMachine.java | 2 +- .../TestWorkflowMutableStateImpl.java | 28 +++++++++---------- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/common/reporter/TestStatsReporter.java b/temporal-sdk/src/test/java/io/temporal/common/reporter/TestStatsReporter.java index 6eef0b574b..ba4a44a147 100644 --- a/temporal-sdk/src/test/java/io/temporal/common/reporter/TestStatsReporter.java +++ b/temporal-sdk/src/test/java/io/temporal/common/reporter/TestStatsReporter.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; @@ -38,7 +39,7 @@ public final class TestStatsReporter implements StatsReporter { private final Map gauges = new HashMap<>(); private final Map timers = new HashMap<>(); - public void assertCounter(String name, Map tags) { + public synchronized void assertCounter(String name, Map tags) { String metricName = getMetricName(name, tags); if (!counters.containsKey(metricName)) { fail( @@ -49,7 +50,7 @@ public void assertCounter(String name, Map tags) { } } - public void assertNoMetric(String name, Map tags) { + public synchronized void assertNoMetric(String name, Map tags) { String metricName = getMetricName(name, tags); if (counters.containsKey(metricName)) { fail( @@ -60,7 +61,7 @@ public void assertNoMetric(String name, Map tags) { } } - public void assertCounter(String name, Map tags, long expected) { + public synchronized void assertCounter(String name, Map tags, long expected) { String metricName = getMetricName(name, tags); AtomicLong accumulator = counters.get(metricName); if (accumulator == null) { @@ -73,11 +74,11 @@ public void assertCounter(String name, Map tags, long expected) assertEquals(String.valueOf(accumulator.get()), expected, accumulator.get()); } - public void assertGauge(String name, Map tags, double expected) { + public synchronized void assertGauge(String name, Map tags, double expected) { assertGauge(name, tags, val -> Math.abs(expected - val) < 1e-3); } - public void assertGauge(String name, Map tags, Predicate isExpected) { + public synchronized void assertGauge(String name, Map tags, Predicate isExpected) { String metricName = getMetricName(name, tags); Double value = gauges.get(metricName); if (value == null) { @@ -90,7 +91,7 @@ public void assertGauge(String name, Map tags, Predicate assertTrue(String.valueOf(value), isExpected.test(value)); } - public void assertTimer(String name, Map tags) { + public synchronized void assertTimer(String name, Map tags) { String metricName = getMetricName(name, tags); if (!timers.containsKey(metricName)) { fail( @@ -101,7 +102,7 @@ public void assertTimer(String name, Map tags) { } } - public void assertTimerMinDuration(String name, Map tags, Duration minDuration) { + public synchronized void assertTimerMinDuration(String name, Map tags, Duration minDuration) { String metricName = getMetricName(name, tags); StatsAccumulator value = timers.get(metricName); if (value == null) { @@ -134,7 +135,7 @@ public synchronized void reportGauge(String name, Map tags, doub } @Override - public void reportTimer( + public synchronized void reportTimer( String name, Map tags, com.uber.m3.util.Duration interval) { String metricName = getMetricName(name, tags); StatsAccumulator value = timers.get(metricName); @@ -147,7 +148,7 @@ public void reportTimer( @SuppressWarnings("deprecation") @Override - public void reportHistogramValueSamples( + public synchronized void reportHistogramValueSamples( String name, Map tags, com.uber.m3.tally.Buckets buckets, @@ -159,7 +160,7 @@ public void reportHistogramValueSamples( @SuppressWarnings("deprecation") @Override - public void reportHistogramDurationSamples( + public synchronized void reportHistogramDurationSamples( String name, Map tags, com.uber.m3.tally.Buckets buckets, diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java index 45c678dd6f..27954d9336 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java @@ -43,7 +43,9 @@ public class ActivityTimeoutTest { private TestWorkflowEnvironment testEnvironment; private static final String TASK_QUEUE = "test-activities"; - public @Rule Timeout timeout = Timeout.seconds(10); + // TODO This test takes longer than it should to complete because + // of the cached heartbeat that prevents a quit shutdown + public @Rule Timeout timeout = Timeout.seconds(12); @Before public void setUp() { @@ -56,8 +58,6 @@ public void tearDown() { testEnvironment.close(); } - // TODO This test takes longer than it should to complete because - // of the cached heartbeat that prevents a quit shutdown @Test(timeout = 11_000) public void testActivityStartToCloseTimeout() { Worker worker = testEnvironment.newWorker(TASK_QUEUE); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachine.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachine.java index e0136026dc..325e3771c5 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachine.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachine.java @@ -226,7 +226,7 @@ void action(Action action, RequestContext context, R request, long reference (TransitionDestination) transitions.get(transition); if (destination == null) { throw Status.INTERNAL - .withDescription(this.data + "Invalid " + transition + ", history: " + transitionHistory) + .withDescription(this.data + " Invalid " + transition + ", history: " + transitionHistory) .asRuntimeException(); } state = destination.apply(context, data, request, referenceId); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index d5a624756f..97b38347c9 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -1544,20 +1544,20 @@ public void startWorkflow( .setNamespace(getExecutionId().getNamespace()) .setWorkflowType(startRequest.getWorkflowType()) .build(); - ForkJoinPool.commonPool() - .execute( - () -> { - try { - parent.get().childWorkflowStarted(a); - } catch (StatusRuntimeException e) { - // NOT_FOUND is expected as the parent might just close by now. - if (e.getStatus().getCode() != Status.Code.NOT_FOUND) { - log.error("Failure reporting child completion", e); - } - } catch (Throwable e) { - log.error("Failure trying to add task for an delayed workflow retry", e); - } - }); + + // notifying the parent state machine in the same transaction and thread, otherwise the parent + // may see + // completion before start if it's done asynchronously. + try { + parent.get().childWorkflowStarted(a); + } catch (StatusRuntimeException e) { + // NOT_FOUND is expected as the parent might just close by now. + if (e.getStatus().getCode() != Status.Code.NOT_FOUND) { + log.error("Failure reporting child completion", e); + } + } catch (Exception e) { + log.error("Failure trying to add task for an delayed workflow retry", e); + } } }