Skip to content

Commit

Permalink
[Fix apache#3719] Unmarshall return ObjectNode that is event sensitive
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 14, 2024
1 parent 42d3a4e commit 47c5191
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static String buildSource(String service, String processId) {
if (processId == null) {
return null;
} else {
processId = processId.replace(" ", "-");
return service + "/" + (processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.jbpm.process.core.datatype.DataType;
import org.jbpm.process.core.datatype.DataTypeUtils;
import org.jbpm.process.core.datatype.impl.coverter.CloneHelper;
import org.jbpm.process.core.datatype.impl.coverter.TypeConverterRegistry;

/**
Expand Down Expand Up @@ -86,6 +87,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(className);
}

public Object clone(Object value) {
return CloneHelper.get().clone(value);
}

@Override
public boolean verifyDataType(final Object value) {
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Any marshall(Object unmarshalled) {
public Object unmarshall(Any data) {
try {
KogitoTypesProtobuf.JsonNode storedValue = data.unpack(KogitoTypesProtobuf.JsonNode.class);
return ObjectMapperFactory.get().readTree(storedValue.getContent());
return ObjectMapperFactory.listenerAware().readTree(storedValue.getContent());
} catch (InvalidProtocolBufferException | JsonProcessingException e1) {
throw new ProcessInstanceMarshallerException("Error trying to unmarshalling a Json Node value", e1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.kie.kogito.process.impl.AbstractProcess;
import org.w3c.dom.Document;

import com.fasterxml.jackson.databind.ObjectMapper;

import jakarta.xml.bind.JAXBContext;
import jakarta.xml.bind.annotation.XmlRootElement;

Expand Down Expand Up @@ -197,8 +196,8 @@ private static Stream<Arguments> testRoundTrip() throws Exception {
Arguments.of(5l),
Arguments.of(BigDecimal.valueOf(10l)),
Arguments.of(new MarshableObject("henry")),
Arguments.of(new ObjectMapper().readTree("{ \"key\" : \"value\" }")),
Arguments.of(new ObjectMapper().valueToTree(marshableObject)),
Arguments.of(ObjectMapperFactory.listenerAware().readTree("{ \"key\" : \"value\" }")),
Arguments.of(ObjectMapperFactory.listenerAware().valueToTree(marshableObject)),
Arguments.of(new Date()),
Arguments.of(Instant.now()),
Arguments.of(OffsetDateTime.now()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.kie.kogito.StaticConfig;
import org.kie.kogito.codegen.api.context.impl.JavaKogitoBuildContext;
import org.kie.kogito.config.StaticConfigBean;
import org.kie.kogito.event.EventManager;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.impl.EventFactoryUtils;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.internal.process.event.KogitoProcessEventListener;
Expand All @@ -65,6 +67,8 @@
import org.kie.kogito.serverless.workflow.utils.MultiSourceConfigResolver;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
import org.kie.kogito.services.uow.UnitOfWorkExecutor;
import org.kie.kogito.uow.UnitOfWorkManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,6 +102,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto
private Iterable<StaticProcessRegister> processRegisters;
private final Collection<AutoCloseable> closeables = new ArrayList<>();
private final Map<String, SynchronousQueue<JsonNodeModel>> queues;
private final UnitOfWorkManager manager;
private ProcessInstancesFactory processInstancesFactory;

private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener {
Expand Down Expand Up @@ -127,7 +132,10 @@ public void afterProcessCompleted(ProcessCompletedEvent event) {
public static class WorkflowApplicationBuilder {

private Map<String, Object> properties;
private String serviceName = "EmbeddedKogito";
private Collection<KogitoProcessEventListener> listeners = new ArrayList<>();
private Optional<UnitOfWorkManager> manager = Optional.empty();
private Collection<EventPublisher> publishers = new ArrayList<>();

private WorkflowApplicationBuilder() {
}
Expand All @@ -145,14 +153,36 @@ public WorkflowApplicationBuilder withEventListener(KogitoProcessEventListener l
return this;
}

public WorkflowApplicationBuilder withManager(UnitOfWorkManager manager) {
this.manager = Optional.ofNullable(manager);
return this;
}

public WorkflowApplicationBuilder withService(String serviceName) {
this.serviceName = serviceName;
return this;
}

public WorkflowApplicationBuilder withEventPublisher(EventPublisher publisher, EventPublisher... extraPublishers) {
publishers.add(publisher);
for (EventPublisher extraPublisher : extraPublishers) {
publishers.add(extraPublisher);
}
return this;
}

public StaticWorkflowApplication build() {
if (properties == null) {
this.properties = loadApplicationDotProperties();
}
Map<String, SynchronousQueue<JsonNodeModel>> queues = new ConcurrentHashMap<>();
listeners.add(new StaticCompletionEventListener(queues));
StaticWorkflowApplication application = new StaticWorkflowApplication(properties, queues, listeners);
StaticWorkflowApplication application =
new StaticWorkflowApplication(properties, queues, listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())));
application.applicationRegisters.forEach(register -> register.register(application));
EventManager eventManager = application.manager.eventManager();
eventManager.setService(serviceName);
publishers.forEach(p -> eventManager.addPublisher(p));
return application;
}
}
Expand Down Expand Up @@ -189,14 +219,15 @@ public static StaticWorkflowApplication create(Map<String, Object> properties) {
return builder().withProperties(properties).build();
}

private StaticWorkflowApplication(Map<String, Object> properties, Map<String, SynchronousQueue<JsonNodeModel>> queues, Collection<KogitoProcessEventListener> listeners) {
private StaticWorkflowApplication(Map<String, Object> properties, Map<String, SynchronousQueue<JsonNodeModel>> queues, Collection<KogitoProcessEventListener> listeners,
UnitOfWorkManager manager) {
super(new StaticConfig(new Addons(Collections.emptySet()), new StaticProcessConfig(new CachedWorkItemHandlerConfig(),
new DefaultProcessEventListenerConfig(listeners),
new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())), new StaticConfigBean()));
new DefaultProcessEventListenerConfig(listeners), manager), new StaticConfigBean()));
if (!properties.isEmpty()) {
ConfigResolverHolder.setConfigResolver(MultiSourceConfigResolver.withSystemProperties(properties));
}
this.queues = queues;
this.manager = manager;
applicationRegisters = ServiceLoader.load(StaticApplicationRegister.class);
workflowRegisters = ServiceLoader.load(StaticWorkflowRegister.class);
processRegisters = ServiceLoader.load(StaticProcessRegister.class);
Expand Down Expand Up @@ -268,8 +299,10 @@ public JsonNodeModel execute(Process<JsonNodeModel> process, JsonNode data) {
*/
public JsonNodeModel execute(Process<JsonNodeModel> process, JsonNodeModel model) {
ProcessInstance<JsonNodeModel> processInstance = process.createInstance(model);
processInstance.start();
return processInstance.variables();
return UnitOfWorkExecutor.executeInUnitOfWork(manager, () -> {
processInstance.start();
return processInstance.variables();
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.serverless.workflow.executor;

import java.util.ArrayList;
import java.util.Collection;

import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;

public class EventPublisherCollector implements EventPublisher {

private Collection<DataEvent<?>> events = new ArrayList<>();

@Override
public void publish(DataEvent<?> event) {
events.add(event);
}

@Override
public void publish(Collection<DataEvent<?>> events) {
events.forEach(this::publish);
}

public Collection<DataEvent<?>> events() {
return events;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,27 @@
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.api.event.process.ProcessVariableChangedEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.kie.kogito.serverless.workflow.SWFConstants;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.node.TextNode;

Expand All @@ -51,21 +61,37 @@
import static org.kie.kogito.serverless.workflow.fluent.WorkflowBuilder.workflow;

public class PersistentApplicationTest {

private final static Logger logger = LoggerFactory.getLogger(PersistentApplicationTest.class);

@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();

final EventPublisherCollector eventCollector = new EventPublisherCollector();
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"Viva er Beti manque pierda\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
StaticWorkflowApplication.builder().withEventListener(new DefaultKogitoProcessEventListener() {
@Override
public void afterVariableChanged(ProcessVariableChangedEvent event) {
logger.info(event.toString());

}
}).withEventPublisher(eventCollector).build().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, Map.of()).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(2000)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti manque pierda"));
await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty());
List<ProcessInstanceVariableEventBody> dataChangeEvents = eventCollector.events().stream().filter(ProcessInstanceVariableDataEvent.class::isInstance)
.map(ProcessInstanceVariableDataEvent.class::cast).map(ProcessInstanceVariableDataEvent::getData).collect(Collectors.toList());
assertThat(dataChangeEvents).hasSize(2);
assertThat(dataChangeEvents.get(0).getVariableName()).isEqualTo(SWFConstants.DEFAULT_WORKFLOW_VAR);
assertThat(dataChangeEvents.get(1).getVariableName()).isEqualTo(SWFConstants.DEFAULT_WORKFLOW_VAR + ".additionalData");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit 47c5191

Please sign in to comment.