diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f312bc..a48a31f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## v1.3.0 * Refactor `RetriableTask` and add new `CompoundTask`, fixing Fan-out/Fan-in stuck when using `RetriableTask` ([#157](https://github.com/microsoft/durabletask-java/pull/157)) +* Refactor `createTimer` to be non-blocking ([#161](https://github.com/microsoft/durabletask-java/pull/161)) ## v1.2.0 @@ -20,7 +21,6 @@ * Fix the potential NPE issue of `DurableTaskClient#terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104)) * Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115)) * Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114)) -* Support restartInstance and pass restartPostUri in HttpManagementPayload ([#108](https://github.com/microsoft/durabletask-java/issues/108)) ## v1.0.0 diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index daf74d4..3cbb0ce 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -586,16 +586,11 @@ public Task createTimer(ZonedDateTime zonedDateTime) { } private Task createTimer(Instant finalFireAt) { - Duration remainingTime = Duration.between(this.currentInstant, finalFireAt); - while (remainingTime.compareTo(this.maximumTimerInterval) > 0) { - Instant nextFireAt = this.currentInstant.plus(this.maximumTimerInterval); - createInstantTimer(this.sequenceNumber++, nextFireAt).await(); - remainingTime = Duration.between(this.currentInstant, finalFireAt); - } - return createInstantTimer(this.sequenceNumber++, finalFireAt); + TimerTask timer = new TimerTask(finalFireAt); + return timer; } - private Task createInstantTimer(int id, Instant fireAt) { + private CompletableTask createInstantTimer(int id, Instant fireAt) { Timestamp ts = DataConverter.getTimestampFromInstant(fireAt); this.pendingActions.put(id, OrchestratorAction.newBuilder() .setId(id) @@ -941,6 +936,61 @@ List getNewEvents() { } } + private class TimerTask extends CompletableTask { + private Instant finalFireAt; + CompletableTask task; + + public TimerTask(Instant finalFireAt) { + super(); + CompletableTask firstTimer = createTimerTask(finalFireAt); + CompletableFuture timerChain = createTimerChain(finalFireAt, firstTimer.future); + this.task = new CompletableTask<>(timerChain); + this.finalFireAt = finalFireAt; + } + + // For a short timer (less than maximumTimerInterval), once the currentFuture completes, we must have reached finalFireAt, + // so we return and no more sub-timers are created. For a long timer (more than maximumTimerInterval), once a given + // currentFuture completes, we check if we have not yet reached finalFireAt. If that is the case, we create a new sub-timer + // task and make a recursive call on that new sub-timer task so that once it completes, another sub-timer task is created + // if necessary. Otherwise, we return and no more sub-timers are created. + private CompletableFuture createTimerChain(Instant finalFireAt, CompletableFuture currentFuture) { + return currentFuture.thenRun(() -> { + if (currentInstant.compareTo(finalFireAt) > 0) { + return; + } + Task nextTimer = createTimerTask(finalFireAt); + + createTimerChain(finalFireAt, nextTimer.future); + }); + } + + private CompletableTask createTimerTask(Instant finalFireAt) { + CompletableTask nextTimer; + Duration remainingTime = Duration.between(currentInstant, finalFireAt); + if (remainingTime.compareTo(maximumTimerInterval) > 0) { + Instant nextFireAt = currentInstant.plus(maximumTimerInterval); + nextTimer = createInstantTimer(sequenceNumber++, nextFireAt); + } else { + nextTimer = createInstantTimer(sequenceNumber++, finalFireAt); + } + nextTimer.setParentTask(this); + return nextTimer; + } + + private void handleSubTimerSuccess() { + // check if it is the last timer + if (currentInstant.compareTo(finalFireAt) >= 0) { + this.complete(null); + } + } + + @Override + public Void await() { + return this.task.await(); + } + + } + private class ExternalEventTask extends CompletableTask { private final String eventName; private final Duration timeout; @@ -1257,6 +1307,10 @@ public boolean complete(V value) { // notify parent task ((RetriableTask) parentTask).handleChildSuccess(value); } + if (parentTask instanceof TimerTask) { + // notify parent task + ((TimerTask) parentTask).handleSubTimerSuccess(); + } return result; } diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index aa4e5a0..94f7333 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -9,6 +9,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -93,8 +94,10 @@ void longTimer() throws TimeoutException { final String orchestratorName = "LongTimer"; final Duration delay = Duration.ofSeconds(7); AtomicInteger counter = new AtomicInteger(); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(4); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { + timestamps.set(counter.get(), LocalDateTime.now()); counter.incrementAndGet(); ctx.createTimer(delay).await(); }) @@ -117,9 +120,93 @@ void longTimer() throws TimeoutException { // Verify that the correct number of timers were created // This should yield 4 (first invocation + replay invocations for internal timers 3s + 3s + 1s) assertEquals(4, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[3]; + for (int i = 0; i < timestamps.length() - 1; i++) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + } + assertEquals(secondsElapsed[0], 3); + assertEquals(secondsElapsed[1], 3); + assertEquals(secondsElapsed[2], 1); + } + } + + @Test + void longTimerNonblocking() throws TimeoutException { + final String orchestratorName = "ActivityAnyOf"; + final String externalEventActivityName = "externalEvent"; + final String externalEventWinner = "The external event completed first"; + final String timerEventWinner = "The timer event completed first"; + final Duration timerDuration = Duration.ofSeconds(20); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + Task externalEvent = ctx.waitForExternalEvent(externalEventActivityName, String.class); + Task longTimer = ctx.createTimer(timerDuration); + Task winnerEvent = ctx.anyOf(externalEvent, longTimer).await(); + if (winnerEvent == externalEvent) { + ctx.complete(externalEventWinner); + } else { + ctx.complete(timerEventWinner); + } + }).setMaximumTimerInterval(Duration.ofSeconds(3)).buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + client.raiseEvent(instanceId, externalEventActivityName, "Hello world"); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + String output = instance.readOutputAs(String.class); + assertNotNull(output); + assertTrue(output.equals(externalEventWinner)); + + long createdTime = instance.getCreatedAt().getEpochSecond(); + long completedTime = instance.getLastUpdatedAt().getEpochSecond(); + // Timer did not block execution + assertTrue(completedTime - createdTime < 5); } } + @Test + void longTimerNonblockingNoExternal() throws TimeoutException { + final String orchestratorName = "ActivityAnyOf"; + final String externalEventActivityName = "externalEvent"; + final String externalEventWinner = "The external event completed first"; + final String timerEventWinner = "The timer event completed first"; + final Duration timerDuration = Duration.ofSeconds(20); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + Task externalEvent = ctx.waitForExternalEvent(externalEventActivityName, String.class); + Task longTimer = ctx.createTimer(timerDuration); + Task winnerEvent = ctx.anyOf(externalEvent, longTimer).await(); + if (winnerEvent == externalEvent) { + ctx.complete(externalEventWinner); + } else { + ctx.complete(timerEventWinner); + } + }).setMaximumTimerInterval(Duration.ofSeconds(3)).buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + String output = instance.readOutputAs(String.class); + assertNotNull(output); + assertTrue(output.equals(timerEventWinner)); + + long expectedCompletionSecond = instance.getCreatedAt().plus(timerDuration).getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + } + } + + @Test void longTimeStampTimer() throws TimeoutException { final String orchestratorName = "LongTimeStampTimer";