Skip to content

Commit

Permalink
Fix test server publishing the Completion of child workflows before t…
Browse files Browse the repository at this point in the history
…he Start sometimes (#1289)

Issue #1288
  • Loading branch information
Spikhalskiy authored Jun 27, 2022
1 parent 462a3cf commit aee11a8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class TestStatsReporter implements StatsReporter {
private final Map<String, Double> gauges = new HashMap<>();
private final Map<String, StatsAccumulator> timers = new HashMap<>();

public void assertCounter(String name, Map<String, String> tags) {
public synchronized void assertCounter(String name, Map<String, String> tags) {
String metricName = getMetricName(name, tags);
if (!counters.containsKey(metricName)) {
fail(
Expand All @@ -49,7 +49,7 @@ public void assertCounter(String name, Map<String, String> tags) {
}
}

public void assertNoMetric(String name, Map<String, String> tags) {
public synchronized void assertNoMetric(String name, Map<String, String> tags) {
String metricName = getMetricName(name, tags);
if (counters.containsKey(metricName)) {
fail(
Expand All @@ -60,7 +60,7 @@ public void assertNoMetric(String name, Map<String, String> tags) {
}
}

public void assertCounter(String name, Map<String, String> tags, long expected) {
public synchronized void assertCounter(String name, Map<String, String> tags, long expected) {
String metricName = getMetricName(name, tags);
AtomicLong accumulator = counters.get(metricName);
if (accumulator == null) {
Expand All @@ -73,11 +73,12 @@ public void assertCounter(String name, Map<String, String> tags, long expected)
assertEquals(String.valueOf(accumulator.get()), expected, accumulator.get());
}

public void assertGauge(String name, Map<String, String> tags, double expected) {
public synchronized void assertGauge(String name, Map<String, String> tags, double expected) {
assertGauge(name, tags, val -> Math.abs(expected - val) < 1e-3);
}

public void assertGauge(String name, Map<String, String> tags, Predicate<Double> isExpected) {
public synchronized void assertGauge(
String name, Map<String, String> tags, Predicate<Double> isExpected) {
String metricName = getMetricName(name, tags);
Double value = gauges.get(metricName);
if (value == null) {
Expand All @@ -90,7 +91,7 @@ public void assertGauge(String name, Map<String, String> tags, Predicate<Double>
assertTrue(String.valueOf(value), isExpected.test(value));
}

public void assertTimer(String name, Map<String, String> tags) {
public synchronized void assertTimer(String name, Map<String, String> tags) {
String metricName = getMetricName(name, tags);
if (!timers.containsKey(metricName)) {
fail(
Expand All @@ -101,7 +102,8 @@ public void assertTimer(String name, Map<String, String> tags) {
}
}

public void assertTimerMinDuration(String name, Map<String, String> tags, Duration minDuration) {
public synchronized void assertTimerMinDuration(
String name, Map<String, String> tags, Duration minDuration) {
String metricName = getMetricName(name, tags);
StatsAccumulator value = timers.get(metricName);
if (value == null) {
Expand Down Expand Up @@ -134,7 +136,7 @@ public synchronized void reportGauge(String name, Map<String, String> tags, doub
}

@Override
public void reportTimer(
public synchronized void reportTimer(
String name, Map<String, String> tags, com.uber.m3.util.Duration interval) {
String metricName = getMetricName(name, tags);
StatsAccumulator value = timers.get(metricName);
Expand All @@ -147,7 +149,7 @@ public void reportTimer(

@SuppressWarnings("deprecation")
@Override
public void reportHistogramValueSamples(
public synchronized void reportHistogramValueSamples(
String name,
Map<String, String> tags,
com.uber.m3.tally.Buckets buckets,
Expand All @@ -159,7 +161,7 @@ public void reportHistogramValueSamples(

@SuppressWarnings("deprecation")
@Override
public void reportHistogramDurationSamples(
public synchronized void reportHistogramDurationSamples(
String name,
Map<String, String> tags,
com.uber.m3.tally.Buckets buckets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class ActivityTimeoutTest {
private TestWorkflowEnvironment testEnvironment;
private static final String TASK_QUEUE = "test-activities";

public @Rule Timeout timeout = Timeout.seconds(10);
// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quit shutdown
public @Rule Timeout timeout = Timeout.seconds(12);

@Before
public void setUp() {
Expand All @@ -56,8 +58,6 @@ public void tearDown() {
testEnvironment.close();
}

// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quit shutdown
@Test(timeout = 11_000)
public void testActivityStartToCloseTimeout() {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ <R> void action(Action action, RequestContext context, R request, long reference
(TransitionDestination<Data, R>) transitions.get(transition);
if (destination == null) {
throw Status.INTERNAL
.withDescription(this.data + "Invalid " + transition + ", history: " + transitionHistory)
.withDescription(this.data + " Invalid " + transition + ", history: " + transitionHistory)
.asRuntimeException();
}
state = destination.apply(context, data, request, referenceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,20 +1544,20 @@ public void startWorkflow(
.setNamespace(getExecutionId().getNamespace())
.setWorkflowType(startRequest.getWorkflowType())
.build();
ForkJoinPool.commonPool()
.execute(
() -> {
try {
parent.get().childWorkflowStarted(a);
} catch (StatusRuntimeException e) {
// NOT_FOUND is expected as the parent might just close by now.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
log.error("Failure reporting child completion", e);
}
} catch (Throwable e) {
log.error("Failure trying to add task for an delayed workflow retry", e);
}
});

// notifying the parent state machine in the same transaction and thread, otherwise the parent
// may see
// completion before start if it's done asynchronously.
try {
parent.get().childWorkflowStarted(a);
} catch (StatusRuntimeException e) {
// NOT_FOUND is expected as the parent might just close by now.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
log.error("Failure reporting child completion", e);
}
} catch (Exception e) {
log.error("Failure trying to add task for an delayed workflow retry", e);
}
}
}

Expand Down

0 comments on commit aee11a8

Please sign in to comment.