Skip to content

Commit

Permalink
MINOR: Use interface schemas for governance workflows (#19411)
Browse files Browse the repository at this point in the history
* feat(apps): support event subscriptions

- added support for apps with event subscriptions.
- added support for custom consumers on event subscriptions.
- use native application methods instead of reflection in ApplicationHandler

* removed runMethodFromApplication reflection and use concrete methods from AbstractNativeApplication

* format

* fix(governance-workflows): defined interface in workflow schema

- defined types for workflow nodes
- fixed updateEdge method in WorkflowDefinitionRepository

* ref(governance-workflows): use explicit types in schema

use explicit interface type in json schemas and use annotations to infer the concrete classes when deserializing

* - implemented CustomSignal trigger
- use JsonTypeInfo and JsonSubTypes to infer types in jsonschema for nodes and triggers
- Implemented JsonLogicFilter task
- implemented placeholder noop task

* - simplified trigger type names
- removed governance workflow root "type"
- added "deployed" field to governance workflow
- applied changes to existing gov-workflows
- migrations

* fixed migrations
  • Loading branch information
sushi30 authored Jan 24, 2025
1 parent 94ed7e3 commit a6670af
Show file tree
Hide file tree
Showing 35 changed files with 802 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UPDATE workflow_definition_entity
SET json = JSON_SET(json, '$.trigger.type', 'eventBasedEntity')
WHERE JSON_EXTRACT(json, '$.trigger.type') = 'eventBasedEntityWorkflow';

UPDATE workflow_definition_entity
SET json = JSON_SET(json, '$.trigger.type', 'periodicBatchEntity')
WHERE JSON_EXTRACT(json, '$.trigger.type') = 'periodicBatchEntityWorkflow';
3 changes: 3 additions & 0 deletions bootstrap/sql/migrations/native/1.7.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE workflow_definition_entity
SET json = JSON_REMOVE(json, '$.type')
WHERE JSON_EXTRACT(json, '$.type') IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UPDATE workflow_definition_entity
SET json = jsonb_set(json, '{trigger,type}', '"eventBasedEntity"')
WHERE json->'trigger'->>'type' = 'eventBasedEntityWorkflow';

UPDATE workflow_definition_entity
SET json = jsonb_set(json, '{trigger,type}', '"periodicBatchEntity"')
WHERE json->'trigger'->>'type' = 'periodicBatchEntityWorkflow';
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE workflow_definition_entity
SET json = jsonb - 'type'
WHERE json->>'type' IS NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
@Getter
public class Workflow {
public static final String RELATED_ENTITY_VARIABLE = "relatedEntity";
public static final String PAYLOAD = "payload";
public static final String RESULT_VARIABLE = "result";
public static final String RESOLVED_BY_VARIABLE = "resolvedBy";
public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.exception.UnhandledServerException;
Expand Down Expand Up @@ -156,12 +157,15 @@ public void deploy(Workflow workflow) {
.deploy();
}

public void deleteWorkflowDefinition(String processDefinitionKey) {
public boolean isDeployed(WorkflowDefinition wf) {
List<ProcessDefinition> processDefinitions =
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKey(processDefinitionKey)
.list();
repositoryService.createProcessDefinitionQuery().processDefinitionKey(wf.getName()).list();
return !processDefinitions.isEmpty();
}

public void deleteWorkflowDefinition(WorkflowDefinition wf) {
List<ProcessDefinition> processDefinitions =
repositoryService.createProcessDefinitionQuery().processDefinitionKey(wf.getName()).list();

for (ProcessDefinition processDefinition : processDefinitions) {
String deploymentId = processDefinition.getDeploymentId();
Expand All @@ -172,7 +176,7 @@ public void deleteWorkflowDefinition(String processDefinitionKey) {
List<ProcessDefinition> triggerProcessDefinition =
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKey(getTriggerWorkflowId(processDefinitionKey))
.processDefinitionKey(getTriggerWorkflowId(wf.getName()))
.list();

for (ProcessDefinition processDefinition : triggerProcessDefinition) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
package org.openmetadata.service.governance.workflows.elements;

import java.util.Map;
import org.openmetadata.schema.governance.workflows.elements.NodeSubType;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.JsonLogicTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.StartEventDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.UserApprovalTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.JsonLogicFilterTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.NoOpTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask;
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
import org.openmetadata.service.governance.workflows.elements.nodes.startEvent.StartEvent;
import org.openmetadata.service.governance.workflows.elements.nodes.userTask.UserApprovalTask;
import org.openmetadata.service.util.JsonUtils;

public class NodeFactory {
public static NodeInterface createNode(Map<String, Object> nodeDefinition) {
return switch (NodeSubType.fromValue((String) nodeDefinition.get("subType"))) {
case START_EVENT -> new StartEvent(
JsonUtils.readOrConvertValue(nodeDefinition, StartEventDefinition.class));
case END_EVENT -> new EndEvent(
JsonUtils.readOrConvertValue(nodeDefinition, EndEventDefinition.class));
public static NodeInterface createNode(WorkflowNodeDefinitionInterface nodeDefinition) {
return switch (NodeSubType.fromValue(nodeDefinition.getSubType())) {
case START_EVENT -> new StartEvent((StartEventDefinition) nodeDefinition);
case END_EVENT -> new EndEvent((EndEventDefinition) nodeDefinition);
case CHECK_ENTITY_ATTRIBUTES_TASK -> new CheckEntityAttributesTask(
JsonUtils.readOrConvertValue(nodeDefinition, CheckEntityAttributesTaskDefinition.class));
(CheckEntityAttributesTaskDefinition) nodeDefinition);
case SET_ENTITY_CERTIFICATION_TASK -> new SetEntityCertificationTask(
JsonUtils.readOrConvertValue(nodeDefinition, SetEntityCertificationTaskDefinition.class));
(SetEntityCertificationTaskDefinition) nodeDefinition);
case SET_GLOSSARY_TERM_STATUS_TASK -> new SetGlossaryTermStatusTask(
JsonUtils.readOrConvertValue(nodeDefinition, SetGlossaryTermStatusTaskDefinition.class));
case USER_APPROVAL_TASK -> new UserApprovalTask(
JsonUtils.readOrConvertValue(nodeDefinition, UserApprovalTaskDefinition.class));
(SetGlossaryTermStatusTaskDefinition) nodeDefinition);
case USER_APPROVAL_TASK -> new UserApprovalTask((UserApprovalTaskDefinition) nodeDefinition);
case PYTHON_WORKFLOW_AUTOMATION_TASK -> new NoOpTask(nodeDefinition);
case JSON_LOGIC_TASK -> new JsonLogicFilterTask((JsonLogicTaskDefinition) nodeDefinition);
};
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
package org.openmetadata.service.governance.workflows.elements;

import org.openmetadata.schema.governance.workflows.TriggerType;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.trigger.PeriodicBatchEntityTriggerDefinition;
import org.openmetadata.schema.governance.workflows.elements.triggers.CustomSignalTriggerDefinition;
import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition;
import org.openmetadata.service.governance.workflows.elements.triggers.CustomSignalTrigger;
import org.openmetadata.service.governance.workflows.elements.triggers.EventBasedEntityTrigger;
import org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger;
import org.openmetadata.service.util.JsonUtils;

public class TriggerFactory {
public static TriggerInterface createTrigger(WorkflowDefinition workflowDefinition) {
String mainWorkflowName = workflowDefinition.getFullyQualifiedName();
String triggerWorkflowId = getTriggerWorkflowId(mainWorkflowName);
public static TriggerInterface createTrigger(WorkflowDefinition workflow) {
String triggerWorkflowId = getTriggerWorkflowId(workflow.getFullyQualifiedName());

return switch (workflowDefinition.getType()) {
case EVENT_BASED_ENTITY_WORKFLOW -> new EventBasedEntityTrigger(
mainWorkflowName,
return switch (TriggerType.fromValue(workflow.getTrigger().getType())) {
case EVENT_BASED_ENTITY -> new EventBasedEntityTrigger(
workflow.getName(),
triggerWorkflowId,
JsonUtils.readOrConvertValue(
workflowDefinition.getTrigger(), EventBasedEntityTriggerDefinition.class));
case PERIODIC_BATCH_ENTITY_WORKFLOW -> new PeriodicBatchEntityTrigger(
mainWorkflowName,
(EventBasedEntityTriggerDefinition) workflow.getTrigger());
case CUSTOM_SIGNAL -> new CustomSignalTrigger(
workflow.getName(),
triggerWorkflowId,
JsonUtils.readOrConvertValue(
workflowDefinition.getTrigger(), PeriodicBatchEntityTriggerDefinition.class));
(CustomSignalTriggerDefinition) workflow.getTrigger());
case PERIODIC_BATCH_ENTITY -> new PeriodicBatchEntityTrigger(
workflow.getName(),
triggerWorkflowId,
(PeriodicBatchEntityTriggerDefinition) workflow.getTrigger());
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask;

import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.bpmn.model.StartEvent;
import org.flowable.bpmn.model.SubProcess;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.JsonLogicTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.JsonLogicFilterImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;

public class JsonLogicFilterTask implements NodeInterface {
private final SubProcess subProcess;
private final BoundaryEvent runtimeExceptionBoundaryEvent;

public JsonLogicFilterTask(JsonLogicTaskDefinition nodeDefinition) {
String subProcessId = nodeDefinition.getName();

SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();

StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();

ServiceTask checkEntityAttributes =
getCheckEntityAttributesServiceTask(subProcessId, nodeDefinition.getConfig().getRules());

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();

subProcess.addFlowElement(startEvent);
subProcess.addFlowElement(checkEntityAttributes);
subProcess.addFlowElement(endEvent);

subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), checkEntityAttributes.getId()));
subProcess.addFlowElement(new SequenceFlow(checkEntityAttributes.getId(), endEvent.getId()));

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}

@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}

private ServiceTask getCheckEntityAttributesServiceTask(String subProcessId, String rules) {
FieldExtension rulesExpr =
new FieldExtensionBuilder().fieldName("rulesExpr").fieldValue(rules).build();

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(subProcessId, "jsonLogic"))
.implementation(JsonLogicFilterImpl.class.getName())
.build();
serviceTask.getFieldExtensions().add(rulesExpr);
return serviceTask;
}

public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(subProcess);
process.addFlowElement(runtimeExceptionBoundaryEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask;

import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.bpmn.model.StartEvent;
import org.flowable.bpmn.model.SubProcess;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.NoOpTaskImp;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;

public class NoOpTask implements NodeInterface {
private final SubProcess subProcess;
private final BoundaryEvent runtimeExceptionBoundaryEvent;

public NoOpTask(WorkflowNodeDefinitionInterface nodeDefinition) {
String subProcessId = nodeDefinition.getName();

SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();

StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();

ServiceTask noOpTask = setNoOpTask(subProcessId);

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();

subProcess.addFlowElement(startEvent);
subProcess.addFlowElement(noOpTask);
subProcess.addFlowElement(endEvent);

subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), noOpTask.getId()));
subProcess.addFlowElement(new SequenceFlow(noOpTask.getId(), endEvent.getId()));

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}

@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}

private ServiceTask setNoOpTask(String subProcessId) {

return new ServiceTaskBuilder()
.id(getFlowableElementId(subProcessId, "printHelloTask"))
.implementation(NoOpTaskImp.class.getName())
.build();
}

public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(subProcess);
process.addFlowElement(runtimeExceptionBoundaryEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;

import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.PAYLOAD;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

import io.github.jamsesso.jsonlogic.JsonLogic;
import io.github.jamsesso.jsonlogic.JsonLogicException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.util.JsonUtils;

@Slf4j
public class JsonLogicFilterImpl implements JavaDelegate {
private Expression rulesExpr;

@Override
public void execute(DelegateExecution execution) {
try {
// TODO why is 'rulesExpr' not passed as a variable?
String rules = (String) rulesExpr.getValue(execution);
String payload = (String) execution.getVariable("payload");
List<ChangeEvent> filtered =
JsonUtils.readObjects(payload, ChangeEvent.class).stream()
.filter(
ce -> {
EntityInterface entity =
Entity.getEntity(ce.getEntityType(), ce.getEntityId(), "*", Include.ALL);
return checkAttributes(rules, entity);
})
.toList();

execution.setVariable(PAYLOAD, JsonUtils.pojoToJson(filtered));
} catch (Exception exc) {
LOG.error(
"[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc);
execution.setVariable(EXCEPTION_VARIABLE, exc.toString());
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
}
}

private Boolean checkAttributes(String rules, EntityInterface entity) {
JsonLogic jsonLogic = new JsonLogic();
try {
return (boolean) jsonLogic.apply(rules, JsonUtils.getMap(entity));
} catch (JsonLogicException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;

import static org.openmetadata.service.governance.workflows.Workflow.PAYLOAD;

import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;

@Slf4j
public class NoOpTaskImp implements JavaDelegate {
private Expression statusExpr;

@Override
public void execute(DelegateExecution execution) {
String payload = (String) execution.getVariable(PAYLOAD);
System.out.println("NoOpTaskImp: " + payload);
}
}
Loading

0 comments on commit a6670af

Please sign in to comment.