Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support loading workflow definitions from database #531

Merged
merged 26 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
42139d4
Support loading workflow definitions from database if nflow.definitio…
gmokki Jun 3, 2022
6eecb10
Remove useless comment
gmokki Jun 6, 2022
b43b78f
Make the stored procedure check interval configurable
gmokki Jun 7, 2022
7413205
Fix tests
gmokki Jul 19, 2022
e844d43
Skip database loads if definition reload interval <= 0
gmokki Jul 19, 2022
0c65a7e
Ignore varargs generics warning
gmokki Jul 19, 2022
e4431ed
Fix another test
gmokki Jul 19, 2022
232c39d
Keep requireStateMethodExists as final and pass in a flag to skip ver…
gmokki Jul 19, 2022
11b82b8
Keep backwards compatibility by adding a new method instead of modify…
gmokki Jul 19, 2022
b7ef839
Fix SkipAutoStartTest to fail if nflow tries to access the database
gmokki Jul 19, 2022
1115725
Fix spotbugs
gmokki Jul 20, 2022
61b50b6
Add support for clearing profiles, useful for tests that want to over…
gmokki Jul 20, 2022
b4a91a8
Add integrationtest to verify that rest service without workflow clas…
gmokki Jul 20, 2022
957fb71
Change nflow.definition.load to nflow.definition.loadMissingFromDatabase
gmokki Jul 22, 2022
a166626
Add changes
gmokki Jul 22, 2022
fbea92e
Also refresh when fetching the same stored definition repeatedly
gmokki Jul 22, 2022
01fb149
Suppress (what I think is a broken) findbugs warning
gmokki Jul 23, 2022
b606fa1
Add 300ms more time for second thread to run and finish
gmokki Jul 23, 2022
1c6d84c
Add back accidentally removed line
gmokki Jul 23, 2022
d886c58
Change field order to be nicer
gmokki Jul 23, 2022
5b4c653
update changes
efonsell Jul 27, 2022
11bb045
use streams
efonsell Jul 27, 2022
7699b49
refactor, add todo for config property name
efonsell Jul 27, 2022
d931c0a
update assert messages
efonsell Jul 27, 2022
3c5a589
Reschedule workflows if we do not have local implementation, only the…
gmokki Jul 29, 2022
c28309e
Review fixes
gmokki Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
**Highlights**

- `nflow-engine`
- Add support to manage workflow instances without having the workflow implementation classes in the classpath.
gmokki marked this conversation as resolved.
Show resolved Hide resolved
- Clean up old workflow executors that have expired configured time ago (default 1 year).
- Optimize SQL queries used for dead node detection and workflow instance recovery.
- Add `recovered` timestamp to executor info (database, Java API and REST API).
Expand All @@ -14,6 +15,13 @@
**Details**

- `nflow-engine`
- Add support to manage (create, update and read, but not execute) workflow instances without having the workflow implementation classes in the classpath.
- If a workflow definition is not found from the classpath, it is loaded from the database and stored in memory.
- Workflow definitions that are loaded from the database are refreshed at most at configured interval (`nflow.definition.loadMissingFromDatabaseSeconds`, default 60).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, someone might depend on the fact that instances cannot be managed for historical workflow definitions. So need to consider which kind of version needs to be released next, or could start with -1 as default and change the default to 60 in next major version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that a bit too theoretical to worry about breaking it. Nevertheless, I added few lines on readme about that and instructions on how to disable the new functionality.

- Potential use cases:
- Creating workflow instances in a separate application that does not need to be updated when the workflow implementations are changed
- Generic nFlow REST service that does not need to be updated when the workflow implementations or definitions are changed
- Having a cluster of nFlow executors that just execute the workflows and have no other business logic
- Improve shutdown sequence.
- Workflows that were acquired from the database but have not started executing can now be resumed immediately by another executor.
- Executing workflows are interrupted 5 seconds before shutdown timeout so that they get a chance to persist their state to the database. This also allows other executors to immediately resume the processing of the successfully interrupted workflows. The interrupting can be disabled by setting `nflow.executor.interrupt` to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public DataSource nflowDatasource(Environment env, Object metricRegistry) {
config.setMaximumPoolSize(property(env, "max_pool_size", Integer.class));
config.setIdleTimeout(property(env, "idle_timeout_seconds", Long.class) * 1000);
config.setAutoCommit(true);
config.setInitializationFailTimeout(property(env, "initialization_fail_timeout_seconds", Long.class) * 1000);
if (metricRegistry != null) {
config.setMetricRegistry(metricRegistry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class WorkflowDispatcher implements Runnable {
private final int stuckThreadThresholdSeconds;
private final Random rand = new Random();
private final boolean allowInterrupt;
private final boolean autoStart;

@Inject
@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances,
WorkflowStateProcessorFactory stateProcessorFactory, WorkflowDefinitionService workflowDefinitions, ExecutorDao executorDao,
DispatcherExceptionAnalyzer exceptionAnalyzer, NflowLogger nflowLogger, Environment env) {
Expand All @@ -63,7 +63,14 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
this.allowInterrupt = env.getProperty("nflow.executor.interrupt", Boolean.class, TRUE);
this.autoStart = env.getRequiredProperty("nflow.autostart", Boolean.class);
if (autoStart) {
verifyDatabaseSetup();
}
}

@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
private void verifyDatabaseSetup() {
if (!executorDao.isTransactionSupportEnabled()) {
throw new BeanCreationException("Transaction support must be enabled");
}
Expand All @@ -74,9 +81,13 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao

@Override
public void run() {
logger.info("Dispacther started.");
try {
if (!autoStart) {
verifyDatabaseSetup();
}
workflowDefinitions.postProcessWorkflowDefinitions();

logger.info("Dispatcher started.");
running.set(true);
while (!shutdownRequested.get()) {
if (paused.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.function.Supplier;

import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -147,7 +148,7 @@ private void runImpl() {
WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, EnumSet.of(CURRENT_STATE_VARIABLES), null);
logIfLagging(instance);
WorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type);
if (definition == null) {
if (definition == null || definition instanceof StoredWorkflowDefinitionWrapper) {
rescheduleUnknownWorkflowType(instance);
return;
}
Expand Down Expand Up @@ -217,11 +218,11 @@ private void runImpl() {
private void logRetryableException(StateProcessExceptionHandling exceptionHandling, String state, Throwable thrown) {
if (exceptionHandling.logStackTrace) {
nflowLogger.log(logger, exceptionHandling.logLevel, "Handling state '{}' threw a retryable exception, trying again later.",
new Object[] { state, thrown });
state, thrown);
} else {
nflowLogger.log(logger, exceptionHandling.logLevel,
"Handling state '{}' threw a retryable exception, trying again later. Message: {}",
new Object[] { state, thrown.getMessage() });
state, thrown.getMessage());
}
}

Expand Down Expand Up @@ -317,11 +318,11 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
StateSaveExceptionHandling handling = stateSaveExceptionAnalyzer.analyzeSafely(ex, saveRetryCount++);
if (handling.logStackTrace) {
nflowLogger.log(logger, handling.logLevel, "Failed to save workflow instance {} new state, retrying after {} seconds.",
new Object[] { instance.id, handling.retryDelay, ex });
instance.id, handling.retryDelay, ex);
} else {
nflowLogger.log(logger, handling.logLevel,
"Failed to save workflow instance {} new state, retrying after {} seconds. Error: {}",
new Object[] { instance.id, handling.retryDelay, ex.getMessage() });
instance.id, handling.retryDelay, ex.getMessage());
}
sleepIgnoreInterrupted(handling.retryDelay.getStandardSeconds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ public int compareTo(Signal o) {
return value.compareTo(o.value);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.nflow.engine.internal.workflow;

import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;

import java.util.Collection;

import io.nflow.engine.workflow.curated.State;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;

public class StoredWorkflowDefinitionWrapper extends WorkflowDefinition {
public StoredWorkflowDefinitionWrapper(StoredWorkflowDefinition stored) {
super(stored.type, getInitialState(stored), getErrorState(stored), new WorkflowSettings.Builder().build(), emptyMap(), allStates(stored), false);
setDescription(stored.description);
}

private static Collection<WorkflowState> allStates(StoredWorkflowDefinition stored) {
return stored.states.stream().map(StoredWorkflowDefinitionWrapper::toState).collect(toList());
}

private static WorkflowState getInitialState(StoredWorkflowDefinition stored) {
return stored.states.stream()
.filter(state -> start.name().equals((state.type)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It a shame that we don't currently persist the default start state (=initial state). But can't be helped here, perhaps add another issue about that.

.findFirst()
.map(StoredWorkflowDefinitionWrapper::toState)
.orElseThrow(() -> new IllegalStateException("Could not find initial state for " + stored));
}

private static WorkflowState toState(StoredWorkflowDefinition.State state) {
return new State(state.id, WorkflowStateType.valueOf(state.type), state.description);
}

private static WorkflowState getErrorState(StoredWorkflowDefinition stored) {
return stored.states.stream()
.filter(state -> stored.onError.equals((state.id)))
.findFirst()
.map(StoredWorkflowDefinitionWrapper::toState)
.orElseThrow(() -> new IllegalStateException("Could not find error state for " + stored));
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.nflow.engine.service;

import static java.lang.Long.MAX_VALUE;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;
import static java.util.Collections.synchronizedMap;
import static java.util.Collections.unmodifiableList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -16,7 +21,10 @@
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.nflow.engine.internal.dao.WorkflowDefinitionDao;
import io.nflow.engine.internal.workflow.StoredWorkflowDefinition;
import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper;
import io.nflow.engine.workflow.definition.WorkflowDefinition;

/**
Expand All @@ -33,12 +41,21 @@ public class WorkflowDefinitionService {
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;
private final long storedDefinitionCheckInterval;
/**
* The next time the stored definitions from database are checked for changes.
*/
private long nextCheckOfStoredDefinitions;

@Inject
public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) {
this.workflowDefinitionDao = workflowDefinitionDao;
this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class);
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);
// TODO: config property name? also in CHANGES.md
gmokki marked this conversation as resolved.
Show resolved Hide resolved
this.storedDefinitionCheckInterval = SECONDS
.toMillis(env.getRequiredProperty("nflow.definition.loadMissingFromDatabaseSeconds.interval.seconds", Integer.class));
this.nextCheckOfStoredDefinitions = storedDefinitionCheckInterval > 0 ? 0 : MAX_VALUE;
}

/**
Expand All @@ -49,7 +66,14 @@ public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, En
* @return The workflow definition or null if not found.
*/
public WorkflowDefinition getWorkflowDefinition(String type) {
return workflowDefinitions.get(type);
WorkflowDefinition definition = workflowDefinitions.get(type);
if (definition instanceof StoredWorkflowDefinitionWrapper && getWorkflowDefinitionRefreshTime() > 0) {
definition = null;
}
if (definition == null && refreshStoredDefinitions()) {
definition = workflowDefinitions.get(type);
}
return definition;
}

/**
Expand All @@ -58,6 +82,7 @@ public WorkflowDefinition getWorkflowDefinition(String type) {
* @return List of workflow definitions.
*/
public List<WorkflowDefinition> getWorkflowDefinitions() {
refreshStoredDefinitions();
return workflowDefinitionValues;
}

Expand Down Expand Up @@ -91,9 +116,49 @@ public void addWorkflowDefinition(WorkflowDefinition wd) {
if (autoInit && persistWorkflowDefinitions) {
workflowDefinitionDao.storeWorkflowDefinition(wd);
}
setWorkflowDefinitions(workflowDefinitions.values());
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
}

private void setWorkflowDefinitions(Collection<WorkflowDefinition> newDefinitions) {
synchronized (workflowDefinitions) {
workflowDefinitionValues = new ArrayList<>(workflowDefinitions.values());
workflowDefinitionValues = unmodifiableList(new ArrayList<>(newDefinitions));
}
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
}

@SuppressFBWarnings(value = "NOS_NON_OWNED_SYNCHRONIZATION",
justification = "synchronize(this) is valid and needed to match the below synchronized refreshStoredDefinitions() method")
private long getWorkflowDefinitionRefreshTime() {
if (storedDefinitionCheckInterval <= 0) {
return -1;
}
long now = currentTimeMillis();
synchronized (this) {
if (nextCheckOfStoredDefinitions <= now) {
return now;
}
}
return -1;
}

private synchronized boolean refreshStoredDefinitions() {
long now = getWorkflowDefinitionRefreshTime();
if (now <= -1) {
return false;
}
nextCheckOfStoredDefinitions = now + storedDefinitionCheckInterval;
boolean changed = false;
for (StoredWorkflowDefinition def : workflowDefinitionDao.queryStoredWorkflowDefinitions(emptyList())) {
WorkflowDefinition current = workflowDefinitions.get(def.type);
if (current == null || current instanceof StoredWorkflowDefinitionWrapper) {
StoredWorkflowDefinitionWrapper wrapper = new StoredWorkflowDefinitionWrapper(def);
workflowDefinitions.put(def.type, wrapper);
changed = true;
}
}
if (changed) {
setWorkflowDefinitions(workflowDefinitions.values());
}
return changed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class WorkflowDefinition extends ModelObject {
protected final Map<String, WorkflowState> failureTransitions = new LinkedHashMap<>();
private Map<String, WorkflowStateMethod> stateMethods;
private final Set<WorkflowState> states = new HashSet<>();
private final boolean verifyStateMethodValidity;

/**
* Create a workflow definition with default settings and automatically scanned state methods.
Expand Down Expand Up @@ -95,7 +96,7 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt
*/
protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings,
Map<String, WorkflowStateMethod> stateMethods) {
this(type, initialState, errorState, settings, stateMethods, null);
this(type, initialState, errorState, settings, stateMethods, null, true);
}

/**
Expand All @@ -117,18 +118,50 @@ protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowSt
* The states to be registered for the workflow. If null, the states will be scanned.
*/
protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings,
Map<String, WorkflowStateMethod> stateMethods, Collection<WorkflowState> states) {
Map<String, WorkflowStateMethod> stateMethods, Collection<WorkflowState> states) {
this(type, initialState, errorState, settings, stateMethods, states, true);
}

/**
* Create a workflow definition with given settings, state methods and states.
*
* @param type
* The unique identifier of this workflow definition.
* @param initialState
* The default start state of the workflow. The state is automatically registered as one of the allowed states in this
* workflow.
* @param errorState
* The default error state of the workflow. The state is automatically registered as one of the allowed states in this
* workflow.
* @param settings
* The configuration for the workflow instances of this workflow type.
* @param stateMethods
* The state methods to be used for the states of this workflow type. If null, the methods will be scanned.
* @param states
* The states to be registered for the workflow. If null, the states will be scanned.
* @param verifyStateMethodValidity
* True to search and verify the implementation of registered state methods to ensure they comply with expectations.
*/
protected WorkflowDefinition(String type, WorkflowState initialState, WorkflowState errorState, WorkflowSettings settings,
Map<String, WorkflowStateMethod> stateMethods, Collection<WorkflowState> states, boolean verifyStateMethodValidity) {
Assert.notNull(initialState, "initialState must not be null");
Assert.isTrue(initialState.getType() == WorkflowStateType.start, "initialState must be a start state");
Assert.notNull(errorState, "errorState must not be null");
this.type = type;
this.initialState = initialState;
this.errorState = errorState;
this.settings = settings;
this.verifyStateMethodValidity = verifyStateMethodValidity;
WorkflowDefinitionScanner scanner = new WorkflowDefinitionScanner();
if (stateMethods == null) {
Assert.isTrue(verifyStateMethodValidity,
"State method validity verification must be enabled when given state methods are null (scanned from classpath)");
this.stateMethods = scanner.getStateMethods(getClass());
} else {
if (!verifyStateMethodValidity) {
Assert.isTrue(stateMethods.isEmpty(),
"State method validity verification must be enabled when given state methods is not empty");
}
this.stateMethods = stateMethods;
}
registerState(initialState);
Expand Down Expand Up @@ -301,6 +334,9 @@ public WorkflowSettings getSettings() {
}

final void requireStateMethodExists(WorkflowState state) {
if (!verifyStateMethodValidity) {
return;
}
WorkflowStateMethod stateMethod = stateMethods.get(state.name());
if (stateMethod == null && !state.getType().isFinal()) {
String msg = format(
Expand Down Expand Up @@ -405,5 +441,4 @@ public boolean isAllowedNextAction(WorkflowInstance instance, NextAction nextAct
public Map<Integer, String> getSupportedSignals() {
return emptyMap();
}

}
2 changes: 2 additions & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true
nflow.db.disable_batch_updates=false
nflow.db.workflowInstanceType.cacheSize=10000
nflow.db.initialization_fail_timeout_seconds=10

nflow.definition.persist=true
nflow.definition.loadMissingFromDatabaseSeconds.interval.seconds=60

nflow.maintenance.insertWorkflowIfMissing=true
nflow.maintenance.initial.cron=4 4 4 * * *
Expand Down
Loading