Skip to content

Commit

Permalink
Support loading workflow definitions from database (#531)
Browse files Browse the repository at this point in the history
* Support loading workflow definitions from database if nflow.definition.load=true is set
* Make the stored workflow check interval configurable
* Fix SkipAutoStartTest to fail if nflow tries to access the database

Co-authored-by: Edvard Fonsell <edvard.fonsell@nitor.com>
  • Loading branch information
gmokki and efonsell authored Jul 29, 2022
1 parent 095691c commit 4ba6bb2
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 20 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
**Highlights**

- `nflow-engine`
- Add support to manage workflow instances without having the workflow implementation classes in the classpath.
- Clean up old workflow executors that have expired configured time ago (default 1 year).
- Optimize SQL queries used for dead node detection and workflow instance recovery.
- Add `recovered` timestamp to executor info (database, Java API and REST API).
Expand All @@ -14,6 +15,15 @@
**Details**

- `nflow-engine`
- Add support to manage (create, update and read, but not execute) workflow instances without having the workflow implementation classes in the classpath. [#53](https://github.com/NitorCreations/nflow/pull/531)
- If a workflow definition is not found from the classpath, it is loaded from the database and stored in memory.
- Workflow definitions that are loaded from the database are refreshed at most at configured interval (`nflow.definition.refreshStoredFromDatabase.interval.seconds`, default 60). Set to -1 to revert to previous functionality.
- Potential use cases:
- Creating workflow instances in a separate application that does not need to be updated when the workflow implementations are changed
- Generic nFlow REST service that does not need to be updated when the workflow implementations or definitions are changed
- Having a cluster of nFlow executors that just execute the workflows and have no other business logic
- Potential change break in functionality:
- If your code relied on the fact can workflows cannot be manipulated after their code is removed from classpath this change will break that guarantee, unless you disable this feature.
- Improve shutdown sequence.
- Workflows that were acquired from the database but have not started executing can now be resumed immediately by another executor.
- Executing workflows are interrupted 5 seconds before shutdown timeout so that they get a chance to persist their state to the database. This also allows other executors to immediately resume the processing of the successfully interrupted workflows. The interrupting can be disabled by setting `nflow.executor.interrupt` to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public DataSource nflowDatasource(Environment env, Object metricRegistry) {
config.setMaximumPoolSize(property(env, "max_pool_size", Integer.class));
config.setIdleTimeout(property(env, "idle_timeout_seconds", Long.class) * 1000);
config.setAutoCommit(true);
config.setInitializationFailTimeout(property(env, "initialization_fail_timeout_seconds", Long.class) * 1000);
if (metricRegistry != null) {
config.setMetricRegistry(metricRegistry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class WorkflowDispatcher implements Runnable {
private final int stuckThreadThresholdSeconds;
private final Random rand = new Random();
private final boolean allowInterrupt;
private final boolean autoStart;

@Inject
@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances,
WorkflowStateProcessorFactory stateProcessorFactory, WorkflowDefinitionService workflowDefinitions, ExecutorDao executorDao,
DispatcherExceptionAnalyzer exceptionAnalyzer, NflowLogger nflowLogger, Environment env) {
Expand All @@ -63,7 +63,14 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
this.allowInterrupt = env.getProperty("nflow.executor.interrupt", Boolean.class, TRUE);
this.autoStart = env.getRequiredProperty("nflow.autostart", Boolean.class);
if (autoStart) {
verifyDatabaseSetup();
}
}

@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
private void verifyDatabaseSetup() {
if (!executorDao.isTransactionSupportEnabled()) {
throw new BeanCreationException("Transaction support must be enabled");
}
Expand All @@ -74,9 +81,13 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao

@Override
public void run() {
logger.info("Dispacther started.");
try {
if (!autoStart) {
verifyDatabaseSetup();
}
workflowDefinitions.postProcessWorkflowDefinitions();

logger.info("Dispatcher started.");
running.set(true);
while (!shutdownRequested.get()) {
if (paused.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.function.Supplier;

import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -147,7 +148,7 @@ private void runImpl() {
WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, EnumSet.of(CURRENT_STATE_VARIABLES), null);
logIfLagging(instance);
WorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type);
if (definition == null) {
if (definition == null || definition instanceof StoredWorkflowDefinitionWrapper) {
rescheduleUnknownWorkflowType(instance);
return;
}
Expand Down Expand Up @@ -217,11 +218,11 @@ private void runImpl() {
private void logRetryableException(StateProcessExceptionHandling exceptionHandling, String state, Throwable thrown) {
if (exceptionHandling.logStackTrace) {
nflowLogger.log(logger, exceptionHandling.logLevel, "Handling state '{}' threw a retryable exception, trying again later.",
new Object[] { state, thrown });
state, thrown);
} else {
nflowLogger.log(logger, exceptionHandling.logLevel,
"Handling state '{}' threw a retryable exception, trying again later. Message: {}",
new Object[] { state, thrown.getMessage() });
state, thrown.getMessage());
}
}

Expand Down Expand Up @@ -317,11 +318,11 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
StateSaveExceptionHandling handling = stateSaveExceptionAnalyzer.analyzeSafely(ex, saveRetryCount++);
if (handling.logStackTrace) {
nflowLogger.log(logger, handling.logLevel, "Failed to save workflow instance {} new state, retrying after {} seconds.",
new Object[] { instance.id, handling.retryDelay, ex });
instance.id, handling.retryDelay, ex);
} else {
nflowLogger.log(logger, handling.logLevel,
"Failed to save workflow instance {} new state, retrying after {} seconds. Error: {}",
new Object[] { instance.id, handling.retryDelay, ex.getMessage() });
instance.id, handling.retryDelay, ex.getMessage());
}
sleepIgnoreInterrupted(handling.retryDelay.getStandardSeconds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ public int compareTo(Signal o) {
return value.compareTo(o.value);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.nflow.engine.internal.workflow;

import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;

import java.util.Collection;

import io.nflow.engine.workflow.curated.State;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;

public class StoredWorkflowDefinitionWrapper extends WorkflowDefinition {
public StoredWorkflowDefinitionWrapper(StoredWorkflowDefinition stored) {
super(stored.type, getInitialState(stored), getErrorState(stored), new WorkflowSettings.Builder().build(), emptyMap(), allStates(stored), false);
setDescription(stored.description);
}

private static Collection<WorkflowState> allStates(StoredWorkflowDefinition stored) {
return stored.states.stream().map(StoredWorkflowDefinitionWrapper::toState).collect(toList());
}

private static WorkflowState getInitialState(StoredWorkflowDefinition stored) {
return stored.states.stream()
.filter(state -> start.name().equals((state.type)))
.findFirst()
.map(StoredWorkflowDefinitionWrapper::toState)
.orElseThrow(() -> new IllegalStateException("Could not find initial state for " + stored));
}

private static WorkflowState toState(StoredWorkflowDefinition.State state) {
return new State(state.id, WorkflowStateType.valueOf(state.type), state.description);
}

private static WorkflowState getErrorState(StoredWorkflowDefinition stored) {
return stored.states.stream()
.filter(state -> stored.onError.equals((state.id)))
.findFirst()
.map(StoredWorkflowDefinitionWrapper::toState)
.orElseThrow(() -> new IllegalStateException("Could not find error state for " + stored));
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.nflow.engine.service;

import static java.lang.Long.MAX_VALUE;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;
import static java.util.Collections.synchronizedMap;
import static java.util.Collections.unmodifiableList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -16,7 +21,10 @@
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.nflow.engine.internal.dao.WorkflowDefinitionDao;
import io.nflow.engine.internal.workflow.StoredWorkflowDefinition;
import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper;
import io.nflow.engine.workflow.definition.WorkflowDefinition;

/**
Expand All @@ -33,12 +41,20 @@ public class WorkflowDefinitionService {
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;
private final long storedDefinitionCheckInterval;
/**
* The next time the stored definitions from database are checked for changes.
*/
private long nextCheckOfStoredDefinitions;

@Inject
public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) {
this.workflowDefinitionDao = workflowDefinitionDao;
this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class);
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);
this.storedDefinitionCheckInterval = SECONDS
.toMillis(env.getRequiredProperty("nflow.definition.refreshStoredFromDatabase.interval.seconds", Integer.class));
this.nextCheckOfStoredDefinitions = storedDefinitionCheckInterval > 0 ? 0 : MAX_VALUE;
}

/**
Expand All @@ -49,7 +65,14 @@ public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, En
* @return The workflow definition or null if not found.
*/
public WorkflowDefinition getWorkflowDefinition(String type) {
return workflowDefinitions.get(type);
WorkflowDefinition definition = workflowDefinitions.get(type);
if (definition instanceof StoredWorkflowDefinitionWrapper && getWorkflowDefinitionRefreshTime() > 0) {
definition = null;
}
if (definition == null && refreshStoredDefinitions()) {
definition = workflowDefinitions.get(type);
}
return definition;
}

/**
Expand All @@ -58,6 +81,7 @@ public WorkflowDefinition getWorkflowDefinition(String type) {
* @return List of workflow definitions.
*/
public List<WorkflowDefinition> getWorkflowDefinitions() {
refreshStoredDefinitions();
return workflowDefinitionValues;
}

Expand Down Expand Up @@ -91,9 +115,49 @@ public void addWorkflowDefinition(WorkflowDefinition wd) {
if (autoInit && persistWorkflowDefinitions) {
workflowDefinitionDao.storeWorkflowDefinition(wd);
}
setWorkflowDefinitions(workflowDefinitions.values());
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
}

private void setWorkflowDefinitions(Collection<WorkflowDefinition> newDefinitions) {
synchronized (workflowDefinitions) {
workflowDefinitionValues = new ArrayList<>(workflowDefinitions.values());
workflowDefinitionValues = unmodifiableList(new ArrayList<>(newDefinitions));
}
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
}

@SuppressFBWarnings(value = "NOS_NON_OWNED_SYNCHRONIZATION",
justification = "synchronize(this) is valid and needed to match the below synchronized refreshStoredDefinitions() method")
private long getWorkflowDefinitionRefreshTime() {
if (storedDefinitionCheckInterval <= 0) {
return -1;
}
long now = currentTimeMillis();
synchronized (this) {
if (nextCheckOfStoredDefinitions <= now) {
return now;
}
}
return -1;
}

private synchronized boolean refreshStoredDefinitions() {
long now = getWorkflowDefinitionRefreshTime();
if (now <= -1) {
return false;
}
nextCheckOfStoredDefinitions = now + storedDefinitionCheckInterval;
boolean changed = false;
for (StoredWorkflowDefinition def : workflowDefinitionDao.queryStoredWorkflowDefinitions(emptyList())) {
WorkflowDefinition current = workflowDefinitions.get(def.type);
if (current == null || current instanceof StoredWorkflowDefinitionWrapper) {
StoredWorkflowDefinitionWrapper wrapper = new StoredWorkflowDefinitionWrapper(def);
workflowDefinitions.put(def.type, wrapper);
changed = true;
}
}
if (changed) {
setWorkflowDefinitions(workflowDefinitions.values());
}
return changed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class WorkflowDefinition extends ModelObject {
protected final Map<String, WorkflowState> failureTransitions = new LinkedHashMap<>();
private Map<String, WorkflowStateMethod> stateMethods;
private final Set<WorkflowState> states = new HashSet<>();
private final boolean verifyStateMethodValidity;

/**
* Create a workflow definition with default settings and automatically scanned state methods.
Expand Down Expand Up @@ -95,7 +96,7 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt
*/
protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings,
Map<String, WorkflowStateMethod> stateMethods) {
this(type, initialState, errorState, settings, stateMethods, null);
this(type, initialState, errorState, settings, stateMethods, null, true);
}

/**
Expand All @@ -117,18 +118,50 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt
* The states to be registered for the workflow. If null, the states will be scanned.
*/
protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings,
Map<String, WorkflowStateMethod> stateMethods, Collection<WorkflowState> states) {
Map<String, WorkflowStateMethod> stateMethods, Collection<WorkflowState> states) {
this(type, initialState, errorState, settings, stateMethods, states, true);
}

/**
* Create a workflow definition with given settings, state methods and states.
*
* @param type
* The unique identifier of this workflow definition.
* @param initialState
* The default start state of the workflow. The state is automatically registered as one of the allowed states in this
* workflow.
* @param errorState
* The default error state of the workflow. The state is automatically registered as one of the allowed states in this
* workflow.
* @param settings
* The configuration for the workflow instances of this workflow type.
* @param stateMethods
* The state methods to be used for the states of this workflow type. If null, the methods will be scanned.
* @param states
* The states to be registered for the workflow. If null, the states will be scanned.
* @param verifyStateMethodValidity
* True to search and verify the implementation of registered state methods to ensure they comply with expectations.
*/
protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings,
Map<String, WorkflowStateMethod> stateMethods, Collection<WorkflowState> states, boolean verifyStateMethodValidity) {
Assert.notNull(initialState, "initialState must not be null");
Assert.isTrue(initialState.getType() == WorkflowStateType.start, "initialState must be a start state");
Assert.notNull(errorState, "errorState must not be null");
this.type = type;
this.initialState = initialState;
this.errorState = errorState;
this.settings = settings;
this.verifyStateMethodValidity = verifyStateMethodValidity;
WorkflowDefinitionScanner scanner = new WorkflowDefinitionScanner();
if (stateMethods == null) {
Assert.isTrue(verifyStateMethodValidity,
"State method validity verification must be enabled when given state methods are null (scanned from classpath)");
this.stateMethods = scanner.getStateMethods(getClass());
} else {
if (!verifyStateMethodValidity) {
Assert.isTrue(stateMethods.isEmpty(),
"State method validity verification must be enabled when given state methods is not empty");
}
this.stateMethods = stateMethods;
}
registerState(initialState);
Expand Down Expand Up @@ -301,6 +334,9 @@ public WorkflowSettings getSettings() {
}

final void requireStateMethodExists(WorkflowState state) {
if (!verifyStateMethodValidity) {
return;
}
WorkflowStateMethod stateMethod = stateMethods.get(state.name());
if (stateMethod == null && !state.getType().isFinal()) {
String msg = format(
Expand Down Expand Up @@ -405,5 +441,4 @@ public boolean isAllowedNextAction(WorkflowInstance instance, NextAction nextAct
public Map<Integer, String> getSupportedSignals() {
return emptyMap();
}

}
2 changes: 2 additions & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true
nflow.db.disable_batch_updates=false
nflow.db.workflowInstanceType.cacheSize=10000
nflow.db.initialization_fail_timeout_seconds=10

nflow.definition.persist=true
nflow.definition.refreshStoredFromDatabase.interval.seconds=60

nflow.maintenance.insertWorkflowIfMissing=true
nflow.maintenance.initial.cron=4 4 4 * * *
Expand Down
Loading

0 comments on commit 4ba6bb2

Please sign in to comment.