diff --git a/CHANGELOG.md b/CHANGELOG.md index 6246e73c6..e68eb2cde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ **Highlights** - `nflow-engine` + - Add support to manage workflow instances without having the workflow implementation classes in the classpath. - Clean up old workflow executors that have expired configured time ago (default 1 year). - Optimize SQL queries used for dead node detection and workflow instance recovery. - Add `recovered` timestamp to executor info (database, Java API and REST API). @@ -14,6 +15,15 @@ **Details** - `nflow-engine` + - Add support to manage (create, update and read, but not execute) workflow instances without having the workflow implementation classes in the classpath. [#53](https://github.com/NitorCreations/nflow/pull/531) + - If a workflow definition is not found from the classpath, it is loaded from the database and stored in memory. + - Workflow definitions that are loaded from the database are refreshed at most at configured interval (`nflow.definition.refreshStoredFromDatabase.interval.seconds`, default 60). Set to -1 to revert to previous functionality. + - Potential use cases: + - Creating workflow instances in a separate application that does not need to be updated when the workflow implementations are changed + - Generic nFlow REST service that does not need to be updated when the workflow implementations or definitions are changed + - Having a cluster of nFlow executors that just execute the workflows and have no other business logic + - Potential change break in functionality: + - If your code relied on the fact can workflows cannot be manipulated after their code is removed from classpath this change will break that guarantee, unless you disable this feature. - Improve shutdown sequence. - Workflows that were acquired from the database but have not started executing can now be resumed immediately by another executor. - Executing workflows are interrupted 5 seconds before shutdown timeout so that they get a chance to persist their state to the database. This also allows other executors to immediately resume the processing of the successfully interrupted workflows. The interrupting can be disabled by setting `nflow.executor.interrupt` to false. diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java index 97e391dac..d327293e5 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java +++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java @@ -88,6 +88,7 @@ public DataSource nflowDatasource(Environment env, Object metricRegistry) { config.setMaximumPoolSize(property(env, "max_pool_size", Integer.class)); config.setIdleTimeout(property(env, "idle_timeout_seconds", Long.class) * 1000); config.setAutoCommit(true); + config.setInitializationFailTimeout(property(env, "initialization_fail_timeout_seconds", Long.class) * 1000); if (metricRegistry != null) { config.setMetricRegistry(metricRegistry); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 5ab9fa4cb..b43d3dd28 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -47,9 +47,9 @@ public class WorkflowDispatcher implements Runnable { private final int stuckThreadThresholdSeconds; private final Random rand = new Random(); private final boolean allowInterrupt; + private final boolean autoStart; @Inject - @SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine") public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances, WorkflowStateProcessorFactory stateProcessorFactory, WorkflowDefinitionService workflowDefinitions, ExecutorDao executorDao, DispatcherExceptionAnalyzer exceptionAnalyzer, NflowLogger nflowLogger, Environment env) { @@ -63,7 +63,14 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class); this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class); this.allowInterrupt = env.getProperty("nflow.executor.interrupt", Boolean.class, TRUE); + this.autoStart = env.getRequiredProperty("nflow.autostart", Boolean.class); + if (autoStart) { + verifyDatabaseSetup(); + } + } + @SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine") + private void verifyDatabaseSetup() { if (!executorDao.isTransactionSupportEnabled()) { throw new BeanCreationException("Transaction support must be enabled"); } @@ -74,9 +81,13 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao @Override public void run() { - logger.info("Dispacther started."); try { + if (!autoStart) { + verifyDatabaseSetup(); + } workflowDefinitions.postProcessWorkflowDefinitions(); + + logger.info("Dispatcher started."); running.set(true); while (!shutdownRequested.get()) { if (paused.get()) { diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 095d7abd0..efe4ab587 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.function.Supplier; +import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper; import org.joda.time.DateTime; import org.joda.time.Duration; import org.slf4j.Logger; @@ -147,7 +148,7 @@ private void runImpl() { WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, EnumSet.of(CURRENT_STATE_VARIABLES), null); logIfLagging(instance); WorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type); - if (definition == null) { + if (definition == null || definition instanceof StoredWorkflowDefinitionWrapper) { rescheduleUnknownWorkflowType(instance); return; } @@ -217,11 +218,11 @@ private void runImpl() { private void logRetryableException(StateProcessExceptionHandling exceptionHandling, String state, Throwable thrown) { if (exceptionHandling.logStackTrace) { nflowLogger.log(logger, exceptionHandling.logLevel, "Handling state '{}' threw a retryable exception, trying again later.", - new Object[] { state, thrown }); + state, thrown); } else { nflowLogger.log(logger, exceptionHandling.logLevel, "Handling state '{}' threw a retryable exception, trying again later. Message: {}", - new Object[] { state, thrown.getMessage() }); + state, thrown.getMessage()); } } @@ -317,11 +318,11 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, StateSaveExceptionHandling handling = stateSaveExceptionAnalyzer.analyzeSafely(ex, saveRetryCount++); if (handling.logStackTrace) { nflowLogger.log(logger, handling.logLevel, "Failed to save workflow instance {} new state, retrying after {} seconds.", - new Object[] { instance.id, handling.retryDelay, ex }); + instance.id, handling.retryDelay, ex); } else { nflowLogger.log(logger, handling.logLevel, "Failed to save workflow instance {} new state, retrying after {} seconds. Error: {}", - new Object[] { instance.id, handling.retryDelay, ex.getMessage() }); + instance.id, handling.retryDelay, ex.getMessage()); } sleepIgnoreInterrupted(handling.retryDelay.getStandardSeconds()); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinition.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinition.java index 928093647..76c100fa0 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinition.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinition.java @@ -52,5 +52,4 @@ public int compareTo(Signal o) { return value.compareTo(o.value); } } - } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinitionWrapper.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinitionWrapper.java new file mode 100644 index 000000000..2f42ac63d --- /dev/null +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StoredWorkflowDefinitionWrapper.java @@ -0,0 +1,44 @@ +package io.nflow.engine.internal.workflow; + +import static io.nflow.engine.workflow.definition.WorkflowStateType.start; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; + +import java.util.Collection; + +import io.nflow.engine.workflow.curated.State; +import io.nflow.engine.workflow.definition.WorkflowDefinition; +import io.nflow.engine.workflow.definition.WorkflowSettings; +import io.nflow.engine.workflow.definition.WorkflowState; +import io.nflow.engine.workflow.definition.WorkflowStateType; + +public class StoredWorkflowDefinitionWrapper extends WorkflowDefinition { + public StoredWorkflowDefinitionWrapper(StoredWorkflowDefinition stored) { + super(stored.type, getInitialState(stored), getErrorState(stored), new WorkflowSettings.Builder().build(), emptyMap(), allStates(stored), false); + setDescription(stored.description); + } + + private static Collection allStates(StoredWorkflowDefinition stored) { + return stored.states.stream().map(StoredWorkflowDefinitionWrapper::toState).collect(toList()); + } + + private static WorkflowState getInitialState(StoredWorkflowDefinition stored) { + return stored.states.stream() + .filter(state -> start.name().equals((state.type))) + .findFirst() + .map(StoredWorkflowDefinitionWrapper::toState) + .orElseThrow(() -> new IllegalStateException("Could not find initial state for " + stored)); + } + + private static WorkflowState toState(StoredWorkflowDefinition.State state) { + return new State(state.id, WorkflowStateType.valueOf(state.type), state.description); + } + + private static WorkflowState getErrorState(StoredWorkflowDefinition stored) { + return stored.states.stream() + .filter(state -> stored.onError.equals((state.id))) + .findFirst() + .map(StoredWorkflowDefinitionWrapper::toState) + .orElseThrow(() -> new IllegalStateException("Could not find error state for " + stored)); + } +} diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowDefinitionService.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowDefinitionService.java index 665aa69b8..594b11907 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowDefinitionService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowDefinitionService.java @@ -1,10 +1,15 @@ package io.nflow.engine.service; +import static java.lang.Long.MAX_VALUE; +import static java.lang.System.currentTimeMillis; import static java.util.Collections.emptyList; import static java.util.Collections.synchronizedMap; +import static java.util.Collections.unmodifiableList; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.slf4j.LoggerFactory.getLogger; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -16,7 +21,10 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.nflow.engine.internal.dao.WorkflowDefinitionDao; +import io.nflow.engine.internal.workflow.StoredWorkflowDefinition; +import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper; import io.nflow.engine.workflow.definition.WorkflowDefinition; /** @@ -33,12 +41,20 @@ public class WorkflowDefinitionService { private final WorkflowDefinitionDao workflowDefinitionDao; private final boolean persistWorkflowDefinitions; private final boolean autoInit; + private final long storedDefinitionCheckInterval; + /** + * The next time the stored definitions from database are checked for changes. + */ + private long nextCheckOfStoredDefinitions; @Inject public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) { this.workflowDefinitionDao = workflowDefinitionDao; this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class); this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class); + this.storedDefinitionCheckInterval = SECONDS + .toMillis(env.getRequiredProperty("nflow.definition.refreshStoredFromDatabase.interval.seconds", Integer.class)); + this.nextCheckOfStoredDefinitions = storedDefinitionCheckInterval > 0 ? 0 : MAX_VALUE; } /** @@ -49,7 +65,14 @@ public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, En * @return The workflow definition or null if not found. */ public WorkflowDefinition getWorkflowDefinition(String type) { - return workflowDefinitions.get(type); + WorkflowDefinition definition = workflowDefinitions.get(type); + if (definition instanceof StoredWorkflowDefinitionWrapper && getWorkflowDefinitionRefreshTime() > 0) { + definition = null; + } + if (definition == null && refreshStoredDefinitions()) { + definition = workflowDefinitions.get(type); + } + return definition; } /** @@ -58,6 +81,7 @@ public WorkflowDefinition getWorkflowDefinition(String type) { * @return List of workflow definitions. */ public List getWorkflowDefinitions() { + refreshStoredDefinitions(); return workflowDefinitionValues; } @@ -91,9 +115,49 @@ public void addWorkflowDefinition(WorkflowDefinition wd) { if (autoInit && persistWorkflowDefinitions) { workflowDefinitionDao.storeWorkflowDefinition(wd); } + setWorkflowDefinitions(workflowDefinitions.values()); + logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName()); + } + + private void setWorkflowDefinitions(Collection newDefinitions) { synchronized (workflowDefinitions) { - workflowDefinitionValues = new ArrayList<>(workflowDefinitions.values()); + workflowDefinitionValues = unmodifiableList(new ArrayList<>(newDefinitions)); } - logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName()); + } + + @SuppressFBWarnings(value = "NOS_NON_OWNED_SYNCHRONIZATION", + justification = "synchronize(this) is valid and needed to match the below synchronized refreshStoredDefinitions() method") + private long getWorkflowDefinitionRefreshTime() { + if (storedDefinitionCheckInterval <= 0) { + return -1; + } + long now = currentTimeMillis(); + synchronized (this) { + if (nextCheckOfStoredDefinitions <= now) { + return now; + } + } + return -1; + } + + private synchronized boolean refreshStoredDefinitions() { + long now = getWorkflowDefinitionRefreshTime(); + if (now <= -1) { + return false; + } + nextCheckOfStoredDefinitions = now + storedDefinitionCheckInterval; + boolean changed = false; + for (StoredWorkflowDefinition def : workflowDefinitionDao.queryStoredWorkflowDefinitions(emptyList())) { + WorkflowDefinition current = workflowDefinitions.get(def.type); + if (current == null || current instanceof StoredWorkflowDefinitionWrapper) { + StoredWorkflowDefinitionWrapper wrapper = new StoredWorkflowDefinitionWrapper(def); + workflowDefinitions.put(def.type, wrapper); + changed = true; + } + } + if (changed) { + setWorkflowDefinitions(workflowDefinitions.values()); + } + return changed; } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowDefinition.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowDefinition.java index b82afc6ca..3184726e4 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowDefinition.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowDefinition.java @@ -42,6 +42,7 @@ public abstract class WorkflowDefinition extends ModelObject { protected final Map failureTransitions = new LinkedHashMap<>(); private Map stateMethods; private final Set states = new HashSet<>(); + private final boolean verifyStateMethodValidity; /** * Create a workflow definition with default settings and automatically scanned state methods. @@ -95,7 +96,7 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt */ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings, Map stateMethods) { - this(type, initialState, errorState, settings, stateMethods, null); + this(type, initialState, errorState, settings, stateMethods, null, true); } /** @@ -117,7 +118,32 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt * The states to be registered for the workflow. If null, the states will be scanned. */ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings, - Map stateMethods, Collection states) { + Map stateMethods, Collection states) { + this(type, initialState, errorState, settings, stateMethods, states, true); + } + + /** + * Create a workflow definition with given settings, state methods and states. + * + * @param type + * The unique identifier of this workflow definition. + * @param initialState + * The default start state of the workflow. The state is automatically registered as one of the allowed states in this + * workflow. + * @param errorState + * The default error state of the workflow. The state is automatically registered as one of the allowed states in this + * workflow. + * @param settings + * The configuration for the workflow instances of this workflow type. + * @param stateMethods + * The state methods to be used for the states of this workflow type. If null, the methods will be scanned. + * @param states + * The states to be registered for the workflow. If null, the states will be scanned. + * @param verifyStateMethodValidity + * True to search and verify the implementation of registered state methods to ensure they comply with expectations. + */ + protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings, + Map stateMethods, Collection states, boolean verifyStateMethodValidity) { Assert.notNull(initialState, "initialState must not be null"); Assert.isTrue(initialState.getType() == WorkflowStateType.start, "initialState must be a start state"); Assert.notNull(errorState, "errorState must not be null"); @@ -125,10 +151,17 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt this.initialState = initialState; this.errorState = errorState; this.settings = settings; + this.verifyStateMethodValidity = verifyStateMethodValidity; WorkflowDefinitionScanner scanner = new WorkflowDefinitionScanner(); if (stateMethods == null) { + Assert.isTrue(verifyStateMethodValidity, + "State method validity verification must be enabled when given state methods are null (scanned from classpath)"); this.stateMethods = scanner.getStateMethods(getClass()); } else { + if (!verifyStateMethodValidity) { + Assert.isTrue(stateMethods.isEmpty(), + "State method validity verification must be enabled when given state methods is not empty"); + } this.stateMethods = stateMethods; } registerState(initialState); @@ -301,6 +334,9 @@ public WorkflowSettings getSettings() { } final void requireStateMethodExists(WorkflowState state) { + if (!verifyStateMethodValidity) { + return; + } WorkflowStateMethod stateMethod = stateMethods.get(state.name()); if (stateMethod == null && !state.getType().isFinal()) { String msg = format( @@ -405,5 +441,4 @@ public boolean isAllowedNextAction(WorkflowInstance instance, NextAction nextAct public Map getSupportedSignals() { return emptyMap(); } - } diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index f4ce7efb3..129c8558c 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -66,8 +66,10 @@ nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false nflow.db.workflowInstanceType.cacheSize=10000 +nflow.db.initialization_fail_timeout_seconds=10 nflow.definition.persist=true +nflow.definition.refreshStoredFromDatabase.interval.seconds=60 nflow.maintenance.insertWorkflowIfMissing=true nflow.maintenance.initial.cron=4 4 4 * * * diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index e71a5b073..7b3690b10 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -102,6 +102,7 @@ public void setup() { env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60"); env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); env.setProperty("nflow.db.workflowInstanceType.cacheSize", "10000"); + env.setProperty("nflow.autostart", "true"); when(executorDao.isTransactionSupportEnabled()).thenReturn(true); when(executorDao.isAutoCommitEnabled()).thenReturn(true); when(executorDao.isAutoCommitEnabled()).thenReturn(true); @@ -319,10 +320,11 @@ public void threadShutdown() throws InterruptedException { break; } assertThat(i, lessThan(10)); - sleep(20); + sleep(50); } waitForTick(1); dispatcher.shutdown(); + waitForTick(2); } @Override diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index 706c93b1c..391247858 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -79,6 +79,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -792,6 +793,19 @@ public void instanceWithUnsupportedTypeIsRescheduled() { is("Unsupported workflow type"), greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } + @Test + public void instanceWithStoredInstanceTypeIsRescheduled() { + WorkflowInstance instance = executingInstanceBuilder().setType("unsupported").setState(TestState.BEGIN).build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + when(workflowDefinitions.getWorkflowDefinition(instance.type)).thenReturn(mock(StoredWorkflowDefinitionWrapper.class)); + DateTime oneHourInFuture = now().plusHours(1); + + runExecutorWithTimeout(); + + verify(workflowInstanceDao).updateWorkflowInstance(argThat(matchesWorkflowInstance(inProgress, TestState.BEGIN, 0, + is("Unsupported workflow type"), greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); + } + @Test public void illegalStateChangeGoesToErrorState() { WorkflowInstance instance = executingInstanceBuilder().setType(SIMPLE_TYPE).setState(SimpleTestWorkflow.ILLEGAL_STATE_CHANGE) diff --git a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceTest.java b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceTest.java index c913874f7..322d4bfe6 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceTest.java @@ -1,18 +1,25 @@ package io.nflow.engine.service; +import static java.util.Collections.emptyList; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import io.nflow.engine.internal.workflow.StoredWorkflowDefinition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -21,6 +28,8 @@ import io.nflow.engine.internal.dao.WorkflowDefinitionDao; import io.nflow.engine.internal.executor.BaseNflowTest; +import java.util.List; + public class WorkflowDefinitionServiceTest extends BaseNflowTest { @Mock @@ -37,6 +46,11 @@ public void setup() { } private void initializeService(boolean definitionPersist, boolean autoInit) { + initializeService(definitionPersist, autoInit, 60); + } + + private void initializeService(boolean definitionPersist, boolean autoInit, int reloadInterval) { + when(env.getRequiredProperty("nflow.definition.refreshStoredFromDatabase.interval.seconds", Integer.class)).thenReturn(reloadInterval); when(env.getRequiredProperty("nflow.definition.persist", Boolean.class)).thenReturn(definitionPersist); when(env.getRequiredProperty("nflow.autoinit", Boolean.class)).thenReturn(autoInit); service = new WorkflowDefinitionService(workflowDefinitionDao, env); @@ -107,7 +121,7 @@ public void definitionsAreNotStoredDuringPostProcessingWhenDefinitionPersistIsFa } @Test - public void addingDuplicatDefinitionThrowsException() { + public void addingDuplicateDefinitionThrowsException() { initializeService(true, true); service.addWorkflowDefinition(workflowDefinition); @@ -125,6 +139,15 @@ public void getWorkflowDefinitionReturnsNullWhenTypeIsNotFound() { initializeService(true, true); assertThat(service.getWorkflowDefinition("notFound"), is(nullValue())); + verify(workflowDefinitionDao).queryStoredWorkflowDefinitions(anyList()); + } + + @Test + public void getWorkflowDefinitionReturnsDoesNotQueryDatabaseIfCheckIntervalDisabled() { + initializeService(true, true, 0); + + assertThat(service.getWorkflowDefinition("notFound"), is(nullValue())); + verify(workflowDefinitionDao, never()).queryStoredWorkflowDefinitions(anyList()); } @Test @@ -136,4 +159,45 @@ public void getWorkflowDefinitionReturnsDefinitionWhenTypeIsFound() { assertThat(service.getWorkflowDefinition("dummy"), is(instanceOf(DummyTestWorkflow.class))); } + @Test + @SuppressWarnings("unchecked") + public void getWorkflowDefinitionChecksFromDaoIfNotFoundFromMemory() throws Exception { + initializeService(true, true, 1); + service.addWorkflowDefinition(workflowDefinition); + + var w1 = new StoredWorkflowDefinition(); + w1.type = "w1"; + w1.states = List.of(new StoredWorkflowDefinition.State("start", "start", "start")); + w1.onError = "start"; + + var w2 = new StoredWorkflowDefinition(); + w2.type = "w2"; + w2.states = w1.states; + w2.onError = "start"; + + when(workflowDefinitionDao.queryStoredWorkflowDefinitions(emptyList())).thenReturn(List.of(w1), List.of(w1, w2)); + + // classpath hit will not refresh + assertThat(service.getWorkflowDefinition("dummy"), is(notNullValue())); + verify(workflowDefinitionDao, never()).queryStoredWorkflowDefinitions(emptyList()); + + // missing definition will refresh + assertThat(service.getWorkflowDefinition("w1"), is(notNullValue())); + verify(workflowDefinitionDao, times(1)).queryStoredWorkflowDefinitions(emptyList()); + + // second time immediately will not refresh + assertThat(service.getWorkflowDefinition("w1"), is(notNullValue())); + verify(workflowDefinitionDao, times(1)).queryStoredWorkflowDefinitions(emptyList()); + + // fetching new type always refresh + assertThat(service.getWorkflowDefinition("w2"), is(nullValue())); + verify(workflowDefinitionDao, times(1)).queryStoredWorkflowDefinitions(emptyList()); + + SECONDS.sleep(2); + + // after timeout fetching existing will refresh + assertThat(service.getWorkflowDefinition("w1"), is(notNullValue())); + verify(workflowDefinitionDao, times(2)).queryStoredWorkflowDefinitions(emptyList()); + } + } diff --git a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceWithSpringTest.java b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceWithSpringTest.java index 542256fe4..2d05386df 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceWithSpringTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowDefinitionServiceWithSpringTest.java @@ -56,7 +56,10 @@ static class ContextConfiguration { @Bean @Primary public Environment env() { - return new MockEnvironment().withProperty("nflow.definition.persist", "true").withProperty("nflow.autoinit", "true"); + return new MockEnvironment() + .withProperty("nflow.definition.persist", "true") + .withProperty("nflow.definition.refreshStoredFromDatabase.interval.seconds", "60") + .withProperty("nflow.autoinit", "true"); } @Bean diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index 60753e1ad..aafd514b1 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -19,3 +19,4 @@ nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false nflow.db.workflowInstanceType.cacheSize=10000 +nflow.db.initialization_fail_timeout_seconds=1 \ No newline at end of file diff --git a/nflow-engine/src/test/resources/nflow-engine-test.properties b/nflow-engine/src/test/resources/nflow-engine-test.properties index 5e043ea6f..114eb84cc 100644 --- a/nflow-engine/src/test/resources/nflow-engine-test.properties +++ b/nflow-engine/src/test/resources/nflow-engine-test.properties @@ -1,2 +1,3 @@ nflow.autostart=false nflow.executor.timeout.seconds=800 +nflow.db.initialization_fail_timeout_seconds=1 \ No newline at end of file diff --git a/nflow-server-common/src/main/java/io/nflow/server/spring/NflowStandardEnvironment.java b/nflow-server-common/src/main/java/io/nflow/server/spring/NflowStandardEnvironment.java index a859dc3a8..705ac0f68 100644 --- a/nflow-server-common/src/main/java/io/nflow/server/spring/NflowStandardEnvironment.java +++ b/nflow-server-common/src/main/java/io/nflow/server/spring/NflowStandardEnvironment.java @@ -1,6 +1,7 @@ package io.nflow.server.spring; import static io.nflow.engine.config.Profiles.H2; +import static java.lang.Boolean.FALSE; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.slf4j.LoggerFactory.getLogger; @@ -25,10 +26,14 @@ public NflowStandardEnvironment(Map overrideProperties) { getPropertySources().addFirst(new MapPropertySource("override", overrideProperties)); addExternalPropertyResource(); String env = getProperty("env", "local"); - addActiveProfile(env); addPropertyResource(env); addPropertyResource("common"); addPropertyResource("nflow-server"); + var clearProfiles = getProperty("clearProfiles", Boolean.class, FALSE); + if (clearProfiles) { + setActiveProfiles("ignore-environment-profiles"); + } + addActiveProfile(env); String profiles = getProperty("profiles", String.class, ""); for (String profile : profiles.split(",")) { if (!profile.trim().isEmpty()) { diff --git a/nflow-tests/src/test/java/io/nflow/tests/CodelessEngineTest.java b/nflow-tests/src/test/java/io/nflow/tests/CodelessEngineTest.java new file mode 100644 index 000000000..da0c5236b --- /dev/null +++ b/nflow-tests/src/test/java/io/nflow/tests/CodelessEngineTest.java @@ -0,0 +1,115 @@ +package io.nflow.tests; + +import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; +import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; +import io.nflow.rest.v1.msg.ListWorkflowDefinitionResponse; +import io.nflow.tests.demo.workflow.Demo2Workflow; +import io.nflow.tests.extension.NflowServerConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.springframework.context.annotation.Bean; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static io.nflow.tests.demo.workflow.Demo2Workflow.DEMO2_WORKFLOW_TYPE; +import static io.nflow.tests.demo.workflow.TestState.DONE; +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singletonMap; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.fail; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class CodelessEngineTest extends AbstractNflowTest { + + public static final NflowServerConfig server = new NflowServerConfig.Builder() + .springContextClass(EmptyConfiguration.class) + .prop("nflow.autoinit", "false") + .prop("nflow.autostart", "false") + .prop("nflow.db.create_on_startup", "true") + .prop("nflow.definition.refreshStoredFromDatabase.interval.seconds", "1") + .prop("nflow.maintenance.insertWorkflowIfMissing", "false") + .prop("nflow.db.h2.url", "jdbc:h2:mem:codelessenginetest;TRACE_LEVEL_FILE=4;DB_CLOSE_DELAY=-1") + .build(); + + public static final AtomicReference codeServer = new AtomicReference<>(); + private static CreateWorkflowInstanceResponse resp; + + public CodelessEngineTest() { + super(server); + } + + static class EmptyConfiguration { + } + + static class CodeConfiguration { + @Bean + public Demo2Workflow demo2Workflow() { + return new Demo2Workflow(); + } + } + + @Test + @Order(1) + public void codelessServerHasNoWorkflowsDefinitions() { + ListWorkflowDefinitionResponse[] definitions = getWorkflowDefinitions(); + assertWorkflowDefinitionExists(DEMO2_WORKFLOW_TYPE, definitions, false); + } + + @Test + @Order(2) + public void startCodeServer() throws Exception { + server.setSpringContextClass(CodeConfiguration.class); + codeServer.set(server.anotherServer(Map.of("nflow.autoinit", "true", "nflow.autostart", "true"))); + codeServer.get().before(getClass().getSimpleName()); + } + + @Test + @Order(3) + public void codelessServerSeesTheDefinition() throws Exception { + SECONDS.sleep(1); + ListWorkflowDefinitionResponse[] definitions = getWorkflowDefinitions(); + assertWorkflowDefinitionExists(DEMO2_WORKFLOW_TYPE, definitions, true); + } + + @Test + @Order(4) + public void codelessCanInsertWorkflow() { + var req = new CreateWorkflowInstanceRequest(); + req.type = DEMO2_WORKFLOW_TYPE; + req.businessKey = "1"; + req.stateVariables = singletonMap("test", 1); + resp = createWorkflowInstance(req); + assertThat(resp.id, notNullValue()); + } + + @Test + @Order(5) + public void pollWorkflowToComplete() { + assertThat(getWorkflowInstanceWithTimeout(resp.id, DONE.name(), ofSeconds(30)), notNullValue()); + } + + @AfterAll + public static void shutdown() { + codeServer.get().after(); + } + + private void assertWorkflowDefinitionExists(String type, ListWorkflowDefinitionResponse[] definitions, boolean shouldExist) { + for (ListWorkflowDefinitionResponse def : definitions) { + if (type.equals(def.type)) { + if (shouldExist) { + return; + } + fail("Workflow definition " + type + " should not be found"); + } + } + if (shouldExist) { + fail("Workflow definition " + type + " is missing"); + } + } +} diff --git a/nflow-tests/src/test/java/io/nflow/tests/SkipAutoStartTest.java b/nflow-tests/src/test/java/io/nflow/tests/SkipAutoStartTest.java index 1a4af4991..3a5a478e7 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/SkipAutoStartTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/SkipAutoStartTest.java @@ -9,7 +9,6 @@ import io.nflow.tests.extension.NflowServerConfig; -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SkipAutoStartTest extends AbstractNflowTest { // When nflow.autoinit, nflow.autostart, nflow.db.create_on_startup and nflow.maintenance.insertWorkflowIfMissing are false, no @@ -19,6 +18,10 @@ public class SkipAutoStartTest extends AbstractNflowTest { .prop("nflow.autostart", "false") .prop("nflow.db.create_on_startup", "false") .prop("nflow.maintenance.insertWorkflowIfMissing", "false") + .prop("nflow.db.initialization_fail_timeout_seconds", "-1") + .prop("nflow.db.url", "jdbc:h2:/invalid/path/that/does/not/exist") + .clearProfiles() + .profiles("nflow.db.h2") .build(); public SkipAutoStartTest() { @@ -26,7 +29,6 @@ public SkipAutoStartTest() { } @Test - @Order(1) public void startServerButNotNflow() { assertNotNull(server.getHttpAddress()); } diff --git a/nflow-tests/src/test/java/io/nflow/tests/extension/NflowServerConfig.java b/nflow-tests/src/test/java/io/nflow/tests/extension/NflowServerConfig.java index abbac0b0d..016f95b6b 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/extension/NflowServerConfig.java +++ b/nflow-tests/src/test/java/io/nflow/tests/extension/NflowServerConfig.java @@ -29,6 +29,9 @@ public class NflowServerConfig { port = new AtomicReference<>(b.port); springContextClass = b.springContextClass; metrics = b.metrics; + if (b.clearProfiles) { + props.put("clearProfiles", true); + } } public static class Builder { @@ -38,6 +41,8 @@ public static class Builder { Class springContextClass; boolean metrics = false; final Map props = new LinkedHashMap<>(); + boolean clearProfiles; + { props.put("nflow.db.h2.tcp.port", ""); props.put("nflow.db.h2.console.port", ""); @@ -72,6 +77,12 @@ public Builder metrics(boolean enableMetrics) { this.metrics = enableMetrics; return this; } + + public Builder clearProfiles() { + this.clearProfiles = true; + return this; + } + public NflowServerConfig build() { return new NflowServerConfig(this); } @@ -133,6 +144,9 @@ private void startJetty() throws Exception { } private void stopJetty() { + if (nflowJetty == null) { + return; + } try { nflowJetty.setStopTimeout(10000); nflowJetty.stop();