Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow execution with Workflow.await(condition) times out in unit tests with enabled time skipping #1291

Open
Tracked by #1094
ddavidyuk opened this issue Jun 28, 2022 · 3 comments
Labels
bug Something isn't working test server Related to the test server

Comments

@ddavidyuk
Copy link

Expected Behavior

The unit test below should always pass

Actual Behavior

Sometimes the test fails with io.temporal.client.WorkflowNotFoundException. Changing Workflow.await(condition) to Workflow.await(Duration.ofSeconds(100), condition) in TestWorkflowImpl seems to fix the problem, but not sure why.
Attached are the TRACE logs for io.temporal for when the issue reproduces: bug.log

Steps to Reproduce the Problem

Run the following test:

import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.testing.TestWorkflowExtension;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class WorkflowExecutionTimeoutTest {
    @RegisterExtension
    public static final TestWorkflowExtension TEST_WORKFLOW_EXTENSION =
            TestWorkflowExtension.newBuilder()
                    .setDoNotStart(true)
                    .build();

    private ProcessEventsWorkflow processWorkflowStub;
    private TestWorkflow testWorkflowStub;

    @BeforeEach
    public void setUpTemporal(TestWorkflowEnvironment testEnv,
                              Worker worker,
                              WorkflowClient workflowClient,
                              WorkflowOptions workflowOptions) {
        worker.registerWorkflowImplementationTypes(
                WorkflowImplementationOptions.newBuilder()
                        .setDefaultActivityOptions(ActivityOptions.newBuilder()
                                .setStartToCloseTimeout(Duration.ofSeconds(10))
                                .build())
                        .build(),
                ProcessEventsWorkflowImpl.class,
                TestWorkflowImpl.class);

        testEnv.start();

        processWorkflowStub = workflowClient.newWorkflowStub(ProcessEventsWorkflow.class,
                WorkflowOptions.newBuilder(workflowOptions)
                        .setWorkflowId("ProcessEventsWorkflow")
                        .build());
        testWorkflowStub = workflowClient.newWorkflowStub(TestWorkflow.class,
                WorkflowOptions.newBuilder(workflowOptions)
                        .setWorkflowId("TestWorkflow")
                        .build());
    }

    @Test
    public void testBug() throws TimeoutException {
        // create artificial load to reproduce the bug - seems to help, but still the bug does not always reproduce
        IntStream.range(0, 20).forEach(index -> new Thread(this::busyWork).start());

        WorkflowClient.start(testWorkflowStub::execute);

        WorkflowStub.fromTyped(processWorkflowStub).signalWithStart("addEvent",
                new Object[]{"testEvent"},
                new Object[]{"TestWorkflow", Duration.ofSeconds(1)});
        WorkflowStub.fromTyped(processWorkflowStub).getResult(10, TimeUnit.SECONDS, Object.class);

        testWorkflowStub.stop(); // <----- fails here
        WorkflowStub.fromTyped(testWorkflowStub).getResult(10, TimeUnit.SECONDS, Object.class);

        assertEquals(Arrays.asList("testEvent"), testWorkflowStub.getEvents());
    }

    private void busyWork() {
        int count = 100000000;
        int sleepIndex = (int) (Math.random() * count);
        while(count-- > 0) {
            if (sleepIndex == count) { // yield at random intervals
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            Math.sqrt(count);
        }
    }


    @WorkflowInterface
    public interface ProcessEventsWorkflow {
        @WorkflowMethod
        void execute(String targetWorkflowId, Duration keepAliveTimeout);
        @SignalMethod
        void addEvent(String event);
    }

    public static class ProcessEventsWorkflowImpl implements ProcessEventsWorkflow {
        private final Queue<String> events = new LinkedList<>();

        @Override
        public void execute(String targetWorkflowId, Duration keepAliveTimeout) {
            while (true) {
                while (!events.isEmpty()) {
                    String event = events.poll();
                    Workflow.newExternalWorkflowStub(TestWorkflow.class, targetWorkflowId).onEvent(event);
                }

                Workflow.await(keepAliveTimeout, () -> !events.isEmpty());
                if (events.isEmpty()) {
                    return;
                }
            }
        }

        @Override
        public void addEvent(String event) {
            events.add(event);
        }
    }

    @WorkflowInterface
    public interface TestWorkflow {
        @WorkflowMethod
        void execute();

        @SignalMethod
        void stop();

        @SignalMethod
        void onEvent(String event);

        @QueryMethod
        List<String> getEvents();
    }

    public static class TestWorkflowImpl implements TestWorkflow {
        private boolean stop = false;
        private final List<String> events = new ArrayList<>();

        @Override
        public void execute() {
            Workflow.await(() -> stop);
        }

        @Override
        public void stop() {
            stop = true;
        }

        @Override
        public void onEvent(String event) {
            events.add(event);
        }

        @Override
        public List<String> getEvents() {
            return events;
        }
    }
}

Specifications

  • Version: Temporal Java SDK 1.11.0, 1.12.0, 1.13.0 (reproduces on all of these), Temporal Server 1.16.2
  • Platform: Java
Spikhalskiy added a commit to Spikhalskiy/java-sdk that referenced this issue Jun 29, 2022
@Spikhalskiy
Copy link
Contributor

Spikhalskiy commented Jun 29, 2022

Confirm the issue. It's related to the peculiarities of time skipping, how it's implemented, and an absence of total order between some operations in Temporal.
When you create a load, the client code doesn't stop time skipping soon enough and the time gets advanced A LOT to the workflow task timeout which completed the workflow and doesn't allow the signal to pass through. And there is no way for the server to make sure that the code that is supposed to be executed by the client after a long poll is returned is actually finished executing.

The fix here is not trivial. It's also related only to the test framework and not the core functionality.
I will think about the best fix here, but I have to temporarily deprioritize it behind some other tasks affecting an actual production functionality.

@Spikhalskiy Spikhalskiy self-assigned this Jun 29, 2022
@Spikhalskiy Spikhalskiy added the bug Something isn't working label Jun 29, 2022
@Spikhalskiy Spikhalskiy mentioned this issue Jun 29, 2022
49 tasks
@ddavidyuk
Copy link
Author

Thank you! Could you confirm if the suggested workaround should work? If I change Workflow.await(condition) to Workflow.await(timeout, condition), the issue seems to stop reproducing at least locally. Is this a reliable workaround?

@Spikhalskiy
Copy link
Contributor

No, it's not. The same "bug" or... limitation will trigger your Workflow.await(timeout, condition) earlier than you expect it, like its triggering workflow timeout now. I think the only gentle workaround here is not to set Workflow Timeout of any kind or don't use time skipping for such a test.

@sync-by-unito sync-by-unito bot added the test server Related to the test server label Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working test server Related to the test server
Projects
None yet
Development

No branches or pull requests

2 participants