Skip to content

Commit

Permalink
Support loading workflow definitions from database if nflow.definitio…
Browse files Browse the repository at this point in the history
…n.load=true is set
  • Loading branch information
gmokki committed Jun 3, 2022
1 parent 27d5e89 commit 28fc406
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.nflow.engine.internal.workflow;

import io.nflow.engine.internal.workflow.StoredWorkflowDefinition.State;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static java.util.stream.Collectors.toList;

public class StoredWorkflowDefinitionWrapper extends WorkflowDefinition {
public StoredWorkflowDefinitionWrapper(StoredWorkflowDefinition stored) {
super(stored.type, getInitialState(stored), getErrorState(stored), new WorkflowSettings.Builder().build(), null, allStates(stored));
setDescription(stored.description);
}

private static Collection<WorkflowState> allStates(StoredWorkflowDefinition stored) {
return stored.states.stream().map(StoredWorkflowDefinitionWrapper::toState).collect(toList());
}

private static WorkflowState getInitialState(StoredWorkflowDefinition stored) {
for (State state : stored.states) {
if (start.name().equals(state.type)) {
return toState(state);
}
}
throw new IllegalStateException("Could not find initial state for " + stored);
}

private static WorkflowState toState(StoredWorkflowDefinition.State state) {
return new io.nflow.engine.workflow.curated.State(state.id, WorkflowStateType.valueOf(state.type), state.description);
}

private static WorkflowState getErrorState(StoredWorkflowDefinition stored) {
for (State state : stored.states) {
if (stored.onError.equals(state.id)) {
return toState(state);
}
}
throw new IllegalStateException("Could not find error state for " + stored);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package io.nflow.engine.service;

import static java.lang.Long.MAX_VALUE;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;
import static java.util.Collections.synchronizedMap;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.inject.Singleton;

import io.nflow.engine.internal.workflow.StoredWorkflowDefinition;
import io.nflow.engine.internal.workflow.StoredWorkflowDefinitionWrapper;
import org.slf4j.Logger;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
Expand All @@ -33,12 +39,17 @@ public class WorkflowDefinitionService {
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;
/**
* The next time the stored definitions from database are checked for changes.
*/
private long nextCheckOfStoredDefinitions;

@Inject
public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) {
this.workflowDefinitionDao = workflowDefinitionDao;
this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class);
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);
this.nextCheckOfStoredDefinitions = env.getRequiredProperty("nflow.definition.load", Boolean.class) ? 0 : MAX_VALUE;
}

/**
Expand All @@ -49,7 +60,12 @@ public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, En
* @return The workflow definition or null if not found.
*/
public WorkflowDefinition getWorkflowDefinition(String type) {
return workflowDefinitions.get(type);
WorkflowDefinition ret = workflowDefinitions.get(type);
if (ret == null) {
refreshStoredDefinitions();
}
ret = workflowDefinitions.get(type);
return ret;
}

/**
Expand All @@ -58,6 +74,7 @@ public WorkflowDefinition getWorkflowDefinition(String type) {
* @return List of workflow definitions.
*/
public List<WorkflowDefinition> getWorkflowDefinitions() {
refreshStoredDefinitions();
return workflowDefinitionValues;
}

Expand Down Expand Up @@ -96,4 +113,29 @@ public void addWorkflowDefinition(WorkflowDefinition wd) {
}
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
}

/**
*
*/
private synchronized void refreshStoredDefinitions() {
long now = currentTimeMillis();
if (nextCheckOfStoredDefinitions > now) {
return;
}
nextCheckOfStoredDefinitions = now + MINUTES.toMillis(1);
boolean changed = false;
for (StoredWorkflowDefinition def : workflowDefinitionDao.queryStoredWorkflowDefinitions(emptyList())) {
WorkflowDefinition current = workflowDefinitions.get(def.type);
if (current == null || current instanceof StoredWorkflowDefinitionWrapper) {
StoredWorkflowDefinitionWrapper wrapper = new StoredWorkflowDefinitionWrapper(def);
workflowDefinitions.put(def.type, wrapper);
changed = true;
}
}
if (changed) {
synchronized (workflowDefinitions) {
workflowDefinitionValues = new ArrayList<>(workflowDefinitions.values());
}
}
}
}

0 comments on commit 28fc406

Please sign in to comment.