Skip to content

Commit

Permalink
Fix test server publishing the Completion of child workflows before t…
Browse files Browse the repository at this point in the history
…he Start sometimes

Issue #1288
  • Loading branch information
Spikhalskiy committed Jun 27, 2022
1 parent 462a3cf commit 60e0322
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class TestStatsReporter implements StatsReporter {
private final Map<String, Double> gauges = new HashMap<>();
private final Map<String, StatsAccumulator> timers = new HashMap<>();

public void assertCounter(String name, Map<String, String> tags) {
public synchronized void assertCounter(String name, Map<String, String> tags) {
String metricName = getMetricName(name, tags);
if (!counters.containsKey(metricName)) {
fail(
Expand All @@ -49,7 +49,7 @@ public void assertCounter(String name, Map<String, String> tags) {
}
}

public void assertNoMetric(String name, Map<String, String> tags) {
public synchronized void assertNoMetric(String name, Map<String, String> tags) {
String metricName = getMetricName(name, tags);
if (counters.containsKey(metricName)) {
fail(
Expand All @@ -60,7 +60,7 @@ public void assertNoMetric(String name, Map<String, String> tags) {
}
}

public void assertCounter(String name, Map<String, String> tags, long expected) {
public synchronized void assertCounter(String name, Map<String, String> tags, long expected) {
String metricName = getMetricName(name, tags);
AtomicLong accumulator = counters.get(metricName);
if (accumulator == null) {
Expand All @@ -73,11 +73,12 @@ public void assertCounter(String name, Map<String, String> tags, long expected)
assertEquals(String.valueOf(accumulator.get()), expected, accumulator.get());
}

public void assertGauge(String name, Map<String, String> tags, double expected) {
public synchronized void assertGauge(String name, Map<String, String> tags, double expected) {
assertGauge(name, tags, val -> Math.abs(expected - val) < 1e-3);
}

public void assertGauge(String name, Map<String, String> tags, Predicate<Double> isExpected) {
public synchronized void assertGauge(
String name, Map<String, String> tags, Predicate<Double> isExpected) {
String metricName = getMetricName(name, tags);
Double value = gauges.get(metricName);
if (value == null) {
Expand All @@ -90,7 +91,7 @@ public void assertGauge(String name, Map<String, String> tags, Predicate<Double>
assertTrue(String.valueOf(value), isExpected.test(value));
}

public void assertTimer(String name, Map<String, String> tags) {
public synchronized void assertTimer(String name, Map<String, String> tags) {
String metricName = getMetricName(name, tags);
if (!timers.containsKey(metricName)) {
fail(
Expand All @@ -101,7 +102,8 @@ public void assertTimer(String name, Map<String, String> tags) {
}
}

public void assertTimerMinDuration(String name, Map<String, String> tags, Duration minDuration) {
public synchronized void assertTimerMinDuration(
String name, Map<String, String> tags, Duration minDuration) {
String metricName = getMetricName(name, tags);
StatsAccumulator value = timers.get(metricName);
if (value == null) {
Expand Down Expand Up @@ -134,7 +136,7 @@ public synchronized void reportGauge(String name, Map<String, String> tags, doub
}

@Override
public void reportTimer(
public synchronized void reportTimer(
String name, Map<String, String> tags, com.uber.m3.util.Duration interval) {
String metricName = getMetricName(name, tags);
StatsAccumulator value = timers.get(metricName);
Expand All @@ -147,7 +149,7 @@ public void reportTimer(

@SuppressWarnings("deprecation")
@Override
public void reportHistogramValueSamples(
public synchronized void reportHistogramValueSamples(
String name,
Map<String, String> tags,
com.uber.m3.tally.Buckets buckets,
Expand All @@ -159,7 +161,7 @@ public void reportHistogramValueSamples(

@SuppressWarnings("deprecation")
@Override
public void reportHistogramDurationSamples(
public synchronized void reportHistogramDurationSamples(
String name,
Map<String, String> tags,
com.uber.m3.tally.Buckets buckets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class ActivityTimeoutTest {
private TestWorkflowEnvironment testEnvironment;
private static final String TASK_QUEUE = "test-activities";

public @Rule Timeout timeout = Timeout.seconds(10);
// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quit shutdown
public @Rule Timeout timeout = Timeout.seconds(12);

@Before
public void setUp() {
Expand All @@ -56,8 +58,6 @@ public void tearDown() {
testEnvironment.close();
}

// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quit shutdown
@Test(timeout = 11_000)
public void testActivityStartToCloseTimeout() {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ <R> void action(Action action, RequestContext context, R request, long reference
(TransitionDestination<Data, R>) transitions.get(transition);
if (destination == null) {
throw Status.INTERNAL
.withDescription(this.data + "Invalid " + transition + ", history: " + transitionHistory)
.withDescription(this.data + " Invalid " + transition + ", history: " + transitionHistory)
.asRuntimeException();
}
state = destination.apply(context, data, request, referenceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,20 +1544,20 @@ public void startWorkflow(
.setNamespace(getExecutionId().getNamespace())
.setWorkflowType(startRequest.getWorkflowType())
.build();
ForkJoinPool.commonPool()
.execute(
() -> {
try {
parent.get().childWorkflowStarted(a);
} catch (StatusRuntimeException e) {
// NOT_FOUND is expected as the parent might just close by now.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
log.error("Failure reporting child completion", e);
}
} catch (Throwable e) {
log.error("Failure trying to add task for an delayed workflow retry", e);
}
});

// notifying the parent state machine in the same transaction and thread, otherwise the parent
// may see
// completion before start if it's done asynchronously.
try {
parent.get().childWorkflowStarted(a);
} catch (StatusRuntimeException e) {
// NOT_FOUND is expected as the parent might just close by now.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
log.error("Failure reporting child completion", e);
}
} catch (Exception e) {
log.error("Failure trying to add task for an delayed workflow retry", e);
}
}
}

Expand Down

0 comments on commit 60e0322

Please sign in to comment.