Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Use interface schemas for governance workflows #19411

Merged
merged 11 commits into from
Jan 24, 2025
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UPDATE workflow_definition_entity
SET json = JSON_SET(json, '$.trigger.type', 'eventBasedEntity')
WHERE JSON_EXTRACT(json, '$.trigger.type') = 'eventBasedEntityWorkflow';

UPDATE workflow_definition_entity
SET json = JSON_SET(json, '$.trigger.type', 'periodicBatchEntity')
WHERE JSON_EXTRACT(json, '$.trigger.type') = 'periodicBatchEntityWorkflow';
3 changes: 3 additions & 0 deletions bootstrap/sql/migrations/native/1.7.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE workflow_definition_entity
SET json = JSON_REMOVE(json, '$.type')
WHERE JSON_EXTRACT(json, '$.type') IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UPDATE workflow_definition_entity
SET json = jsonb_set(json, '{trigger,type}', '"eventBasedEntity"')
WHERE json->'trigger'->>'type' = 'eventBasedEntityWorkflow';

UPDATE workflow_definition_entity
SET json = jsonb_set(json, '{trigger,type}', '"periodicBatchEntity"')
WHERE json->'trigger'->>'type' = 'periodicBatchEntityWorkflow';
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE workflow_definition_entity
SET json = jsonb - 'type'
WHERE json->>'type' IS NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
@Getter
public class Workflow {
public static final String RELATED_ENTITY_VARIABLE = "relatedEntity";
public static final String PAYLOAD = "payload";
public static final String RESULT_VARIABLE = "result";
public static final String RESOLVED_BY_VARIABLE = "resolvedBy";
public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.exception.UnhandledServerException;
Expand Down Expand Up @@ -156,12 +157,15 @@ public void deploy(Workflow workflow) {
.deploy();
}

public void deleteWorkflowDefinition(String processDefinitionKey) {
public boolean isDeployed(WorkflowDefinition wf) {
List<ProcessDefinition> processDefinitions =
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKey(processDefinitionKey)
.list();
repositoryService.createProcessDefinitionQuery().processDefinitionKey(wf.getName()).list();
return !processDefinitions.isEmpty();
}

public void deleteWorkflowDefinition(WorkflowDefinition wf) {
List<ProcessDefinition> processDefinitions =
repositoryService.createProcessDefinitionQuery().processDefinitionKey(wf.getName()).list();

for (ProcessDefinition processDefinition : processDefinitions) {
String deploymentId = processDefinition.getDeploymentId();
Expand All @@ -172,7 +176,7 @@ public void deleteWorkflowDefinition(String processDefinitionKey) {
List<ProcessDefinition> triggerProcessDefinition =
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKey(getTriggerWorkflowId(processDefinitionKey))
.processDefinitionKey(getTriggerWorkflowId(wf.getName()))
.list();

for (ProcessDefinition processDefinition : triggerProcessDefinition) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
package org.openmetadata.service.governance.workflows.elements;

import java.util.Map;
import org.openmetadata.schema.governance.workflows.elements.NodeSubType;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.JsonLogicTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.StartEventDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.UserApprovalTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.JsonLogicFilterTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.NoOpTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask;
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
import org.openmetadata.service.governance.workflows.elements.nodes.startEvent.StartEvent;
import org.openmetadata.service.governance.workflows.elements.nodes.userTask.UserApprovalTask;
import org.openmetadata.service.util.JsonUtils;

public class NodeFactory {
public static NodeInterface createNode(Map<String, Object> nodeDefinition) {
return switch (NodeSubType.fromValue((String) nodeDefinition.get("subType"))) {
case START_EVENT -> new StartEvent(
JsonUtils.readOrConvertValue(nodeDefinition, StartEventDefinition.class));
case END_EVENT -> new EndEvent(
JsonUtils.readOrConvertValue(nodeDefinition, EndEventDefinition.class));
public static NodeInterface createNode(WorkflowNodeDefinitionInterface nodeDefinition) {
return switch (NodeSubType.fromValue(nodeDefinition.getSubType())) {
case START_EVENT -> new StartEvent((StartEventDefinition) nodeDefinition);
case END_EVENT -> new EndEvent((EndEventDefinition) nodeDefinition);
case CHECK_ENTITY_ATTRIBUTES_TASK -> new CheckEntityAttributesTask(
JsonUtils.readOrConvertValue(nodeDefinition, CheckEntityAttributesTaskDefinition.class));
(CheckEntityAttributesTaskDefinition) nodeDefinition);
case SET_ENTITY_CERTIFICATION_TASK -> new SetEntityCertificationTask(
JsonUtils.readOrConvertValue(nodeDefinition, SetEntityCertificationTaskDefinition.class));
(SetEntityCertificationTaskDefinition) nodeDefinition);
case SET_GLOSSARY_TERM_STATUS_TASK -> new SetGlossaryTermStatusTask(
JsonUtils.readOrConvertValue(nodeDefinition, SetGlossaryTermStatusTaskDefinition.class));
case USER_APPROVAL_TASK -> new UserApprovalTask(
JsonUtils.readOrConvertValue(nodeDefinition, UserApprovalTaskDefinition.class));
(SetGlossaryTermStatusTaskDefinition) nodeDefinition);
case USER_APPROVAL_TASK -> new UserApprovalTask((UserApprovalTaskDefinition) nodeDefinition);
case PYTHON_WORKFLOW_AUTOMATION_TASK -> new NoOpTask(nodeDefinition);
case JSON_LOGIC_TASK -> new JsonLogicFilterTask((JsonLogicTaskDefinition) nodeDefinition);
};
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
package org.openmetadata.service.governance.workflows.elements;

import org.openmetadata.schema.governance.workflows.TriggerType;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.trigger.PeriodicBatchEntityTriggerDefinition;
import org.openmetadata.schema.governance.workflows.elements.triggers.CustomSignalTriggerDefinition;
import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition;
import org.openmetadata.service.governance.workflows.elements.triggers.CustomSignalTrigger;
import org.openmetadata.service.governance.workflows.elements.triggers.EventBasedEntityTrigger;
import org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger;
import org.openmetadata.service.util.JsonUtils;

public class TriggerFactory {
public static TriggerInterface createTrigger(WorkflowDefinition workflowDefinition) {
String mainWorkflowName = workflowDefinition.getFullyQualifiedName();
String triggerWorkflowId = getTriggerWorkflowId(mainWorkflowName);
public static TriggerInterface createTrigger(WorkflowDefinition workflow) {
String triggerWorkflowId = getTriggerWorkflowId(workflow.getFullyQualifiedName());

return switch (workflowDefinition.getType()) {
case EVENT_BASED_ENTITY_WORKFLOW -> new EventBasedEntityTrigger(
mainWorkflowName,
return switch (TriggerType.fromValue(workflow.getTrigger().getType())) {
case EVENT_BASED_ENTITY -> new EventBasedEntityTrigger(
workflow.getName(),
triggerWorkflowId,
JsonUtils.readOrConvertValue(
workflowDefinition.getTrigger(), EventBasedEntityTriggerDefinition.class));
case PERIODIC_BATCH_ENTITY_WORKFLOW -> new PeriodicBatchEntityTrigger(
mainWorkflowName,
(EventBasedEntityTriggerDefinition) workflow.getTrigger());
case CUSTOM_SIGNAL -> new CustomSignalTrigger(
workflow.getName(),
triggerWorkflowId,
JsonUtils.readOrConvertValue(
workflowDefinition.getTrigger(), PeriodicBatchEntityTriggerDefinition.class));
(CustomSignalTriggerDefinition) workflow.getTrigger());
case PERIODIC_BATCH_ENTITY -> new PeriodicBatchEntityTrigger(
workflow.getName(),
triggerWorkflowId,
(PeriodicBatchEntityTriggerDefinition) workflow.getTrigger());
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask;

import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.bpmn.model.StartEvent;
import org.flowable.bpmn.model.SubProcess;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.JsonLogicTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.JsonLogicFilterImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;

public class JsonLogicFilterTask implements NodeInterface {
private final SubProcess subProcess;
private final BoundaryEvent runtimeExceptionBoundaryEvent;

public JsonLogicFilterTask(JsonLogicTaskDefinition nodeDefinition) {
String subProcessId = nodeDefinition.getName();

SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();

StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();

ServiceTask checkEntityAttributes =
getCheckEntityAttributesServiceTask(subProcessId, nodeDefinition.getConfig().getRules());

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();

subProcess.addFlowElement(startEvent);
subProcess.addFlowElement(checkEntityAttributes);
subProcess.addFlowElement(endEvent);

subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), checkEntityAttributes.getId()));
subProcess.addFlowElement(new SequenceFlow(checkEntityAttributes.getId(), endEvent.getId()));

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}

@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}

private ServiceTask getCheckEntityAttributesServiceTask(String subProcessId, String rules) {
FieldExtension rulesExpr =
new FieldExtensionBuilder().fieldName("rulesExpr").fieldValue(rules).build();

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(subProcessId, "jsonLogic"))
.implementation(JsonLogicFilterImpl.class.getName())
.build();
serviceTask.getFieldExtensions().add(rulesExpr);
return serviceTask;
}

public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(subProcess);
process.addFlowElement(runtimeExceptionBoundaryEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask;

import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.bpmn.model.StartEvent;
import org.flowable.bpmn.model.SubProcess;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.NoOpTaskImp;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;

public class NoOpTask implements NodeInterface {
private final SubProcess subProcess;
private final BoundaryEvent runtimeExceptionBoundaryEvent;

public NoOpTask(WorkflowNodeDefinitionInterface nodeDefinition) {
String subProcessId = nodeDefinition.getName();

SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();

StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();

ServiceTask noOpTask = setNoOpTask(subProcessId);

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();

subProcess.addFlowElement(startEvent);
subProcess.addFlowElement(noOpTask);
subProcess.addFlowElement(endEvent);

subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), noOpTask.getId()));
subProcess.addFlowElement(new SequenceFlow(noOpTask.getId(), endEvent.getId()));

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}

@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}

private ServiceTask setNoOpTask(String subProcessId) {

return new ServiceTaskBuilder()
.id(getFlowableElementId(subProcessId, "printHelloTask"))
.implementation(NoOpTaskImp.class.getName())
.build();
}

public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(subProcess);
process.addFlowElement(runtimeExceptionBoundaryEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;

import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.PAYLOAD;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

import io.github.jamsesso.jsonlogic.JsonLogic;
import io.github.jamsesso.jsonlogic.JsonLogicException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.util.JsonUtils;

@Slf4j
public class JsonLogicFilterImpl implements JavaDelegate {
private Expression rulesExpr;

@Override
public void execute(DelegateExecution execution) {
try {
// TODO why is 'rulesExpr' not passed as a variable?
String rules = (String) rulesExpr.getValue(execution);
String payload = (String) execution.getVariable("payload");
List<ChangeEvent> filtered =
JsonUtils.readObjects(payload, ChangeEvent.class).stream()
.filter(
ce -> {
EntityInterface entity =
Entity.getEntity(ce.getEntityType(), ce.getEntityId(), "*", Include.ALL);
return checkAttributes(rules, entity);
})
.toList();

execution.setVariable(PAYLOAD, JsonUtils.pojoToJson(filtered));
} catch (Exception exc) {
LOG.error(
"[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc);
execution.setVariable(EXCEPTION_VARIABLE, exc.toString());
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
}
}

private Boolean checkAttributes(String rules, EntityInterface entity) {
JsonLogic jsonLogic = new JsonLogic();
try {
return (boolean) jsonLogic.apply(rules, JsonUtils.getMap(entity));
} catch (JsonLogicException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;

import static org.openmetadata.service.governance.workflows.Workflow.PAYLOAD;

import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;

@Slf4j
public class NoOpTaskImp implements JavaDelegate {
private Expression statusExpr;

@Override
public void execute(DelegateExecution execution) {
String payload = (String) execution.getVariable(PAYLOAD);
System.out.println("NoOpTaskImp: " + payload);
}
}
Loading
Loading