From 47c5191715143379513c76d67db101dd6208bcc0 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Fri, 11 Oct 2024 17:53:34 +0200 Subject: [PATCH] [Fix #3719] Unmarshall return ObjectNode that is event sensitive --- .../event/impl/adapter/AdapterHelper.java | 1 + .../datatype/impl/type/ObjectDataType.java | 5 +++ .../ProtobufJsonNodeMessageMarshaller.java | 2 +- .../ProcessInstanceMarshallTest.java | 7 ++- .../executor/StaticWorkflowApplication.java | 45 ++++++++++++++++--- .../pom.xml | 2 +- .../executor/EventPublisherCollector.java | 45 +++++++++++++++++++ .../executor/PersistentApplicationTest.java | 34 ++++++++++++-- .../src/test/resources/logback.xml | 11 +++++ 9 files changed, 136 insertions(+), 16 deletions(-) create mode 100644 kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/EventPublisherCollector.java create mode 100644 kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/resources/logback.xml diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/adapter/AdapterHelper.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/adapter/AdapterHelper.java index b2897eec6ec..224b1859aac 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/adapter/AdapterHelper.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/adapter/AdapterHelper.java @@ -69,6 +69,7 @@ public static String buildSource(String service, String processId) { if (processId == null) { return null; } else { + processId = processId.replace(" ", "-"); return service + "/" + (processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId); } } diff --git a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/core/datatype/impl/type/ObjectDataType.java b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/core/datatype/impl/type/ObjectDataType.java index 3ed0ab31a84..ce547bec665 100755 --- a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/core/datatype/impl/type/ObjectDataType.java +++ b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/core/datatype/impl/type/ObjectDataType.java @@ -25,6 +25,7 @@ import org.jbpm.process.core.datatype.DataType; import org.jbpm.process.core.datatype.DataTypeUtils; +import org.jbpm.process.core.datatype.impl.coverter.CloneHelper; import org.jbpm.process.core.datatype.impl.coverter.TypeConverterRegistry; /** @@ -86,6 +87,10 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(className); } + public Object clone(Object value) { + return CloneHelper.get().clone(value); + } + @Override public boolean verifyDataType(final Object value) { if (value == null) { diff --git a/jbpm/process-serialization-protobuf/src/main/java/org/jbpm/flow/serialization/impl/marshallers/ProtobufJsonNodeMessageMarshaller.java b/jbpm/process-serialization-protobuf/src/main/java/org/jbpm/flow/serialization/impl/marshallers/ProtobufJsonNodeMessageMarshaller.java index ce7f6d16cf7..afa5bbcd102 100644 --- a/jbpm/process-serialization-protobuf/src/main/java/org/jbpm/flow/serialization/impl/marshallers/ProtobufJsonNodeMessageMarshaller.java +++ b/jbpm/process-serialization-protobuf/src/main/java/org/jbpm/flow/serialization/impl/marshallers/ProtobufJsonNodeMessageMarshaller.java @@ -52,7 +52,7 @@ public Any marshall(Object unmarshalled) { public Object unmarshall(Any data) { try { KogitoTypesProtobuf.JsonNode storedValue = data.unpack(KogitoTypesProtobuf.JsonNode.class); - return ObjectMapperFactory.get().readTree(storedValue.getContent()); + return ObjectMapperFactory.listenerAware().readTree(storedValue.getContent()); } catch (InvalidProtocolBufferException | JsonProcessingException e1) { throw new ProcessInstanceMarshallerException("Error trying to unmarshalling a Json Node value", e1); } diff --git a/jbpm/process-serialization-protobuf/src/test/java/org/jbpm/flow/serialization/ProcessInstanceMarshallTest.java b/jbpm/process-serialization-protobuf/src/test/java/org/jbpm/flow/serialization/ProcessInstanceMarshallTest.java index 427311a1eaa..1ec0925ad8e 100644 --- a/jbpm/process-serialization-protobuf/src/test/java/org/jbpm/flow/serialization/ProcessInstanceMarshallTest.java +++ b/jbpm/process-serialization-protobuf/src/test/java/org/jbpm/flow/serialization/ProcessInstanceMarshallTest.java @@ -67,11 +67,10 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.NullSource; import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime; +import org.kie.kogito.jackson.utils.ObjectMapperFactory; import org.kie.kogito.process.impl.AbstractProcess; import org.w3c.dom.Document; -import com.fasterxml.jackson.databind.ObjectMapper; - import jakarta.xml.bind.JAXBContext; import jakarta.xml.bind.annotation.XmlRootElement; @@ -197,8 +196,8 @@ private static Stream testRoundTrip() throws Exception { Arguments.of(5l), Arguments.of(BigDecimal.valueOf(10l)), Arguments.of(new MarshableObject("henry")), - Arguments.of(new ObjectMapper().readTree("{ \"key\" : \"value\" }")), - Arguments.of(new ObjectMapper().valueToTree(marshableObject)), + Arguments.of(ObjectMapperFactory.listenerAware().readTree("{ \"key\" : \"value\" }")), + Arguments.of(ObjectMapperFactory.listenerAware().valueToTree(marshableObject)), Arguments.of(new Date()), Arguments.of(Instant.now()), Arguments.of(OffsetDateTime.now()), diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java index 186d205ee86..a7015801087 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java @@ -47,6 +47,8 @@ import org.kie.kogito.StaticConfig; import org.kie.kogito.codegen.api.context.impl.JavaKogitoBuildContext; import org.kie.kogito.config.StaticConfigBean; +import org.kie.kogito.event.EventManager; +import org.kie.kogito.event.EventPublisher; import org.kie.kogito.event.impl.EventFactoryUtils; import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener; import org.kie.kogito.internal.process.event.KogitoProcessEventListener; @@ -65,6 +67,8 @@ import org.kie.kogito.serverless.workflow.utils.MultiSourceConfigResolver; import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; +import org.kie.kogito.services.uow.UnitOfWorkExecutor; +import org.kie.kogito.uow.UnitOfWorkManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +102,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto private Iterable processRegisters; private final Collection closeables = new ArrayList<>(); private final Map> queues; + private final UnitOfWorkManager manager; private ProcessInstancesFactory processInstancesFactory; private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener { @@ -127,7 +132,10 @@ public void afterProcessCompleted(ProcessCompletedEvent event) { public static class WorkflowApplicationBuilder { private Map properties; + private String serviceName = "EmbeddedKogito"; private Collection listeners = new ArrayList<>(); + private Optional manager = Optional.empty(); + private Collection publishers = new ArrayList<>(); private WorkflowApplicationBuilder() { } @@ -145,14 +153,36 @@ public WorkflowApplicationBuilder withEventListener(KogitoProcessEventListener l return this; } + public WorkflowApplicationBuilder withManager(UnitOfWorkManager manager) { + this.manager = Optional.ofNullable(manager); + return this; + } + + public WorkflowApplicationBuilder withService(String serviceName) { + this.serviceName = serviceName; + return this; + } + + public WorkflowApplicationBuilder withEventPublisher(EventPublisher publisher, EventPublisher... extraPublishers) { + publishers.add(publisher); + for (EventPublisher extraPublisher : extraPublishers) { + publishers.add(extraPublisher); + } + return this; + } + public StaticWorkflowApplication build() { if (properties == null) { this.properties = loadApplicationDotProperties(); } Map> queues = new ConcurrentHashMap<>(); listeners.add(new StaticCompletionEventListener(queues)); - StaticWorkflowApplication application = new StaticWorkflowApplication(properties, queues, listeners); + StaticWorkflowApplication application = + new StaticWorkflowApplication(properties, queues, listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()))); application.applicationRegisters.forEach(register -> register.register(application)); + EventManager eventManager = application.manager.eventManager(); + eventManager.setService(serviceName); + publishers.forEach(p -> eventManager.addPublisher(p)); return application; } } @@ -189,14 +219,15 @@ public static StaticWorkflowApplication create(Map properties) { return builder().withProperties(properties).build(); } - private StaticWorkflowApplication(Map properties, Map> queues, Collection listeners) { + private StaticWorkflowApplication(Map properties, Map> queues, Collection listeners, + UnitOfWorkManager manager) { super(new StaticConfig(new Addons(Collections.emptySet()), new StaticProcessConfig(new CachedWorkItemHandlerConfig(), - new DefaultProcessEventListenerConfig(listeners), - new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())), new StaticConfigBean())); + new DefaultProcessEventListenerConfig(listeners), manager), new StaticConfigBean())); if (!properties.isEmpty()) { ConfigResolverHolder.setConfigResolver(MultiSourceConfigResolver.withSystemProperties(properties)); } this.queues = queues; + this.manager = manager; applicationRegisters = ServiceLoader.load(StaticApplicationRegister.class); workflowRegisters = ServiceLoader.load(StaticWorkflowRegister.class); processRegisters = ServiceLoader.load(StaticProcessRegister.class); @@ -268,8 +299,10 @@ public JsonNodeModel execute(Process process, JsonNode data) { */ public JsonNodeModel execute(Process process, JsonNodeModel model) { ProcessInstance processInstance = process.createInstance(model); - processInstance.start(); - return processInstance.variables(); + return UnitOfWorkExecutor.executeInUnitOfWork(manager, () -> { + processInstance.start(); + return processInstance.variables(); + }); } /** diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/pom.xml b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/pom.xml index 1aa1202830c..cb9fa645b24 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/pom.xml +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/pom.xml @@ -77,7 +77,7 @@ wiremock-jre8 test - + org.kie kie-addons-persistence-rocksdb test diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/EventPublisherCollector.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/EventPublisherCollector.java new file mode 100644 index 00000000000..04cfd048f3e --- /dev/null +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/EventPublisherCollector.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.serverless.workflow.executor; + +import java.util.ArrayList; +import java.util.Collection; + +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.EventPublisher; + +public class EventPublisherCollector implements EventPublisher { + + private Collection> events = new ArrayList<>(); + + @Override + public void publish(DataEvent event) { + events.add(event); + } + + @Override + public void publish(Collection> events) { + events.forEach(this::publish); + } + + public Collection> events() { + return events; + } + +} diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/PersistentApplicationTest.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/PersistentApplicationTest.java index 251a50d532e..d677e4bd90c 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/PersistentApplicationTest.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/java/org/kie/kogito/serverless/workflow/executor/PersistentApplicationTest.java @@ -22,17 +22,27 @@ import java.nio.file.Path; import java.time.Duration; import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.kie.api.event.process.ProcessVariableChangedEvent; +import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; +import org.kie.kogito.event.process.ProcessInstanceVariableEventBody; +import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener; import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory; +import org.kie.kogito.serverless.workflow.SWFConstants; import org.rocksdb.Options; import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.node.TextNode; @@ -51,21 +61,37 @@ import static org.kie.kogito.serverless.workflow.fluent.WorkflowBuilder.workflow; public class PersistentApplicationTest { + + private final static Logger logger = LoggerFactory.getLogger(PersistentApplicationTest.class); + @Test void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException { final String eventType = "testSubscribe"; final String additionalData = "This has been injected by the event"; - Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build(); + + final EventPublisherCollector eventCollector = new EventPublisherCollector(); + Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"Viva er Beti manque pierda\"}")), eventDef(eventType))).end().build(); try (StaticWorkflowApplication application = - StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) { - String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId(); + StaticWorkflowApplication.builder().withEventListener(new DefaultKogitoProcessEventListener() { + @Override + public void afterVariableChanged(ProcessVariableChangedEvent event) { + logger.info(event.toString()); + + } + }).withEventPublisher(eventCollector).build().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) { + String id = application.execute(workflow, Map.of()).getId(); assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData)); publish(eventType, buildCloudEvent(eventType, id) .withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData))) .build()); assertThat(application.waitForFinish(id, Duration.ofSeconds(2000)).orElseThrow().getWorkflowdata()) - .isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti")); + .isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti manque pierda")); await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty()); + List dataChangeEvents = eventCollector.events().stream().filter(ProcessInstanceVariableDataEvent.class::isInstance) + .map(ProcessInstanceVariableDataEvent.class::cast).map(ProcessInstanceVariableDataEvent::getData).collect(Collectors.toList()); + assertThat(dataChangeEvents).hasSize(2); + assertThat(dataChangeEvents.get(0).getVariableName()).isEqualTo(SWFConstants.DEFAULT_WORKFLOW_VAR); + assertThat(dataChangeEvents.get(1).getVariableName()).isEqualTo(SWFConstants.DEFAULT_WORKFLOW_VAR + ".additionalData"); } } diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/resources/logback.xml b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/resources/logback.xml new file mode 100644 index 00000000000..81ff4fee166 --- /dev/null +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-tests/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file