From adec9f06f3fc8d802b3af821f2c3b01aa29fa067 Mon Sep 17 00:00:00 2001 From: Tiago Dolphine Date: Thu, 23 Nov 2023 08:41:06 -0300 Subject: [PATCH] [KOGITO-9886] Add the support for the ProcessDefinitionDataEvent at runtime (#3252) * KOGITO-9886 Add the support for the ProcessDefinitionDataEvent at runtime --- .../org/kie/kogito/event/EventPublisher.java | 4 + .../internal/process/runtime/KogitoNode.java | 2 + .../internal/utils/ConversionUtils.java | 13 ++ .../internal/utils/ConversionUtilsTest.java | 12 ++ .../kogito/event/process/NodeDefinition.java | 101 ++++++++++ .../process/ProcessDefinitionEventBody.java | 188 ++++++++++++++++++ .../kie/kogito/event/AbstractDataEvent.java | 3 + .../kogito/event/impl/ProcessEventBatch.java | 45 +++++ .../process/ProcessDefinitionDataEvent.java | 45 +++++ .../ProcessDefinitionEventRegistry.java | 114 +++++++++++ .../process/ProcessResourceGenerator.java | 3 +- .../events/config/EventsRuntimeConfig.java | 58 ++++++ .../ReactiveMessagingEventPublisher.java | 33 +-- .../KnativeEventingConfigSourceFactory.java | 3 + ...nativeEventingConfigSourceFactoryTest.java | 7 +- .../kogito/addons/quarkus/kubernetes/Foo.java | 25 ++- .../ProcessDefinitionRegistration.java | 50 +++++ .../ServerlessWorkflowAssetsProcessor.java | 8 +- .../pom.xml | 17 ++ .../src/main/resources/application.properties | 12 ++ .../quarkus/workflows/WorkflowEventIT.java | 48 ++++- .../devservices/DataIndexEventPublisher.java | 12 ++ .../events/spring/KafkaEventPublisher.java | 15 +- springboot/archetype/pom.xml | 4 + .../pom.xml | 4 + .../invoker.properties | 3 +- .../pom.xml | 4 + 27 files changed, 792 insertions(+), 41 deletions(-) create mode 100644 api/kogito-events-api/src/main/java/org/kie/kogito/event/process/NodeDefinition.java create mode 100644 api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessDefinitionEventBody.java create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventBatch.java create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessDefinitionDataEvent.java create mode 100644 api/kogito-services/src/main/java/org/kie/kogito/services/registry/ProcessDefinitionEventRegistry.java create mode 100644 quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java create mode 100644 quarkus/extensions/kogito-quarkus-extension-common/kogito-quarkus-common/src/main/java/org/kie/kogito/quarkus/registry/ProcessDefinitionRegistration.java diff --git a/api/kogito-api/src/main/java/org/kie/kogito/event/EventPublisher.java b/api/kogito-api/src/main/java/org/kie/kogito/event/EventPublisher.java index 0869447738c..8a28f56d81b 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/event/EventPublisher.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/event/EventPublisher.java @@ -31,6 +31,10 @@ */ public interface EventPublisher { + String PROCESS_INSTANCES_TOPIC_NAME = "kogito-processinstances-events"; + String USER_TASK_INSTANCES_TOPIC_NAME = "kogito-usertaskinstances-events"; + String PROCESS_DEFINITIONS_TOPIC_NAME = "kogito-processdefinitions-events"; + /** * Publishes individual event * diff --git a/api/kogito-api/src/main/java/org/kie/kogito/internal/process/runtime/KogitoNode.java b/api/kogito-api/src/main/java/org/kie/kogito/internal/process/runtime/KogitoNode.java index 67926f5d9fb..f15769da3de 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/internal/process/runtime/KogitoNode.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/internal/process/runtime/KogitoNode.java @@ -24,4 +24,6 @@ public interface KogitoNode extends Node { NodeContainer getParentContainer(); + + String getUniqueId(); } diff --git a/api/kogito-api/src/main/java/org/kie/kogito/internal/utils/ConversionUtils.java b/api/kogito-api/src/main/java/org/kie/kogito/internal/utils/ConversionUtils.java index 42a1eb7cb08..ee0a4a64234 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/internal/utils/ConversionUtils.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/internal/utils/ConversionUtils.java @@ -261,4 +261,17 @@ public static String sanitizeJavaName(String name, boolean capitalize) { } return sb.toString(); } + + /** + * Receives a String possibly with FQDN org.acme.ProcessTest1 and returns a simple name like ProcessTest1 + * + * @param processId a possible FQDN + * @return simple name + */ + public static String sanitizeToSimpleName(String processId) { + if (Objects.isNull(processId)) { + return null; + } + return processId.substring(processId.lastIndexOf('.') + 1); + } } diff --git a/api/kogito-api/src/test/java/org/kie/kogito/internal/utils/ConversionUtilsTest.java b/api/kogito-api/src/test/java/org/kie/kogito/internal/utils/ConversionUtilsTest.java index 3b2d6b7993f..43f1cc41534 100644 --- a/api/kogito-api/src/test/java/org/kie/kogito/internal/utils/ConversionUtilsTest.java +++ b/api/kogito-api/src/test/java/org/kie/kogito/internal/utils/ConversionUtilsTest.java @@ -27,6 +27,7 @@ import static org.kie.kogito.internal.utils.ConversionUtils.concatPaths; import static org.kie.kogito.internal.utils.ConversionUtils.convert; import static org.kie.kogito.internal.utils.ConversionUtils.convertToCollection; +import static org.kie.kogito.internal.utils.ConversionUtils.sanitizeToSimpleName; import static org.kie.kogito.internal.utils.ConversionUtils.toCamelCase; class ConversionUtilsTest { @@ -138,4 +139,15 @@ public void testConcatPaths() { public void testConvertToCollection() { assertThat(convertToCollection("1,2,3", Integer.class)).isEqualTo(Arrays.asList(1, 2, 3)); } + + @Test + public void testSanitizeToSimpleName() { + String nameFull = "org.acme.ProcessTest1"; + String nameSimple = "ProcessTest2"; + String nameEmpty = ""; + assertThat(sanitizeToSimpleName(nameFull)).isEqualTo("ProcessTest1"); + assertThat(sanitizeToSimpleName(nameSimple)).isEqualTo("ProcessTest2"); + assertThat(sanitizeToSimpleName(nameEmpty)).isEqualTo(""); + assertThat(sanitizeToSimpleName(null)).isNull(); + } } diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/NodeDefinition.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/NodeDefinition.java new file mode 100644 index 00000000000..bfae93f1da6 --- /dev/null +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/NodeDefinition.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import java.util.Map; + +public class NodeDefinition { + private String id; + private String name; + private String type; + private String uniqueId; + private Map metadata; + + public NodeDefinition() { + } + + public NodeDefinition(String id, String name, String type, String uniqueId, Map metadata) { + this.id = id; + this.name = name; + this.type = type; + this.uniqueId = uniqueId; + this.metadata = metadata; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public String getUniqueId() { + return uniqueId; + } + + public Map getMetadata() { + return metadata; + } + + public static NodeDefinitionEventBodyBuilder builder() { + return new NodeDefinitionEventBodyBuilder(); + } + + public static class NodeDefinitionEventBodyBuilder { + private String id; + private String name; + private String type; + private String uniqueId; + private Map metadata; + + public NodeDefinitionEventBodyBuilder setId(String id) { + this.id = id; + return this; + } + + public NodeDefinitionEventBodyBuilder setName(String name) { + this.name = name; + return this; + } + + public NodeDefinitionEventBodyBuilder setType(String type) { + this.type = type; + return this; + } + + public NodeDefinitionEventBodyBuilder setUniqueId(String uniqueId) { + this.uniqueId = uniqueId; + return this; + } + + public NodeDefinitionEventBodyBuilder setMetadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public NodeDefinition build() { + return new NodeDefinition(id, name, type, uniqueId, metadata); + } + } +} diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessDefinitionEventBody.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessDefinitionEventBody.java new file mode 100644 index 00000000000..9a6a61813de --- /dev/null +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessDefinitionEventBody.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public class ProcessDefinitionEventBody { + private String id; + private String name; + private String description; + private String version; + private String type; + private Set roles; + private Set addons; + private Set annotations; + private String endpoint; + private String source; + private Map metadata; + private Collection nodes; + + public ProcessDefinitionEventBody() { + } + + public ProcessDefinitionEventBody(String id, String name, String description, String version, String type, Set roles, Set addons, Set annotations, String endpoint, + String source, Map metadata, Collection nodes) { + this.id = id; + this.name = name; + this.description = description; + this.version = version; + this.type = type; + this.roles = roles; + this.addons = addons; + this.annotations = annotations; + this.endpoint = endpoint; + this.source = source; + this.metadata = metadata; + this.nodes = nodes; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getVersion() { + return version; + } + + public String getType() { + return type; + } + + public Set getRoles() { + return roles; + } + + public Set getAddons() { + return addons; + } + + public String getEndpoint() { + return endpoint; + } + + public String getSource() { + return source; + } + + public Collection getNodes() { + return nodes; + } + + public String getDescription() { + return description; + } + + public Set getAnnotations() { + return annotations; + } + + public Map getMetadata() { + return metadata; + } + + public static ProcessDefinitionEventBodyBuilder builder() { + return new ProcessDefinitionEventBodyBuilder(); + } + + public static class ProcessDefinitionEventBodyBuilder { + private String id; + private String name; + private String description; + private String version; + private String type; + private Set roles; + private Set addons; + private Set annotations; + private String endpoint; + private String source; + private Map metadata; + private Collection nodes; + + public ProcessDefinitionEventBodyBuilder setId(String id) { + this.id = id; + return this; + } + + public ProcessDefinitionEventBodyBuilder setName(String name) { + this.name = name; + return this; + } + + public ProcessDefinitionEventBodyBuilder setVersion(String version) { + this.version = version; + return this; + } + + public ProcessDefinitionEventBodyBuilder setType(String type) { + this.type = type; + return this; + } + + public ProcessDefinitionEventBodyBuilder setRoles(Set roles) { + this.roles = roles; + return this; + } + + public ProcessDefinitionEventBodyBuilder setAddons(Set addons) { + this.addons = addons; + return this; + } + + public ProcessDefinitionEventBodyBuilder setEndpoint(String endpoint) { + this.endpoint = endpoint; + return this; + } + + public ProcessDefinitionEventBodyBuilder setSource(String source) { + this.source = source; + return this; + } + + public ProcessDefinitionEventBodyBuilder setDescription(String description) { + this.description = description; + return this; + } + + public ProcessDefinitionEventBodyBuilder setAnnotations(Set annotations) { + this.annotations = annotations; + return this; + } + + public ProcessDefinitionEventBodyBuilder setMetadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public ProcessDefinitionEventBodyBuilder setNodes(Collection nodes) { + this.nodes = nodes; + return this; + } + + public ProcessDefinitionEventBody build() { + return new ProcessDefinitionEventBody(id, name, description, version, type, roles, addons, annotations, endpoint, source, metadata, nodes); + } + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java index d3f6a29a269..2fcdd704196 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java @@ -54,6 +54,8 @@ import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.core.v1.CloudEventV1; +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + /** * This is an implementation of the {@link DataEvent} that contains basic common attributes referring to * kogito processes metadata. @@ -61,6 +63,7 @@ * @param the payload class type */ @JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(NON_NULL) public abstract class AbstractDataEvent implements DataEvent { /** diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventBatch.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventBatch.java new file mode 100644 index 00000000000..fca70c09800 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventBatch.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.EventBatch; + +public class ProcessEventBatch implements EventBatch { + + private List> events = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void append(Object rawEvent) { + if (!DataEvent.class.isInstance(rawEvent)) { + throw new IllegalArgumentException("The event is not a ProcessDataEvent"); + } + events.add((DataEvent) rawEvent); + } + + @Override + public Collection> events() { + return events; + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessDefinitionDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessDefinitionDataEvent.java new file mode 100644 index 00000000000..b58cece22f7 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessDefinitionDataEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import org.kie.kogito.event.AbstractDataEvent; + +public class ProcessDefinitionDataEvent extends AbstractDataEvent { + + public static final String PROCESS_DEFINITION_EVENT = "ProcessDefinitionEvent"; + + public ProcessDefinitionDataEvent() { + + } + + public ProcessDefinitionDataEvent(ProcessDefinitionEventBody body) { + super(PROCESS_DEFINITION_EVENT, + body.getEndpoint(), + body, + null, + null, + body.getId(), + null, + null, + null, + null, + DATA_CONTENT_TYPE, + null); + } +} \ No newline at end of file diff --git a/api/kogito-services/src/main/java/org/kie/kogito/services/registry/ProcessDefinitionEventRegistry.java b/api/kogito-services/src/main/java/org/kie/kogito/services/registry/ProcessDefinitionEventRegistry.java new file mode 100644 index 00000000000..768cccb7441 --- /dev/null +++ b/api/kogito-services/src/main/java/org/kie/kogito/services/registry/ProcessDefinitionEventRegistry.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.services.registry; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.kie.kogito.Application; +import org.kie.kogito.event.EventBatch; +import org.kie.kogito.event.impl.ProcessEventBatch; +import org.kie.kogito.event.process.NodeDefinition; +import org.kie.kogito.event.process.ProcessDefinitionDataEvent; +import org.kie.kogito.event.process.ProcessDefinitionEventBody; +import org.kie.kogito.internal.utils.ConversionUtils; +import org.kie.kogito.process.Process; +import org.kie.kogito.process.Processes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toSet; + +public class ProcessDefinitionEventRegistry { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessDefinitionEventRegistry.class); + + private Application app; + private String serviceUrl; + + public ProcessDefinitionEventRegistry(Application app, String serviceUrl) { + this.app = app; + this.serviceUrl = serviceUrl; + } + + public void register(Processes processes) { + EventBatch eventBatch = new ProcessEventBatch(); + processes.processIds().stream() + .map(processes::processById) + .map(mapProcessDefinition(app.config().addons().availableAddons(), serviceUrl)) + .forEach(process -> { + LOGGER.debug("Registering process definition with id: {}", process.getId()); + eventBatch.append(process); + }); + LOGGER.debug("Publishing all processes definitions"); + app.unitOfWorkManager().eventManager().publish(eventBatch); + } + + private Function, ProcessDefinitionDataEvent> mapProcessDefinition(Set addons, String endpoint) { + + return p -> { + Map metadata = Collections.emptyMap(); + if (p instanceof Supplier) { + org.kie.api.definition.process.Process processDefinition = ((Supplier) p).get(); + if (processDefinition != null) { + metadata = processDefinition.getMetaData(); + } + } + Set annotations = ((List) metadata.getOrDefault("annotations", emptyList())).stream().collect(toSet()); + String description = (String) metadata.get("Description"); + ProcessDefinitionDataEvent definitionDataEvent = new ProcessDefinitionDataEvent(ProcessDefinitionEventBody.builder() + .setId(p.id()) + .setName(p.name()) + .setVersion(p.version()) + .setType(p.type()) + .setAddons(addons) + .setEndpoint(getEndpoint(endpoint, p)) + .setNodes(getNodesDefinitions(p)) + .setAnnotations(annotations) + .setDescription(description) + .setMetadata(metadata) + .build()); + return definitionDataEvent; + }; + } + + private static String getEndpoint(String endpoint, Process p) { + //sanitize process path in case of fqdn org.acme.ProcessExample -> ProcessExample + String processPath = ConversionUtils.sanitizeToSimpleName(p.id()); + return endpoint + "/" + processPath; + } + + private List getNodesDefinitions(Process p) { + return p.findNodes(n -> true).stream() + .map(node -> NodeDefinition.builder() + .setId(String.valueOf(node.getId())) + .setName(node.getName()) + .setType(node.getClass().getSimpleName()) + .setUniqueId(node.getUniqueId()) + .setMetadata(node.getMetaData()) + .build()) + .collect(Collectors.toList()); + } +} diff --git a/kogito-codegen-modules/kogito-codegen-processes/src/main/java/org/kie/kogito/codegen/process/ProcessResourceGenerator.java b/kogito-codegen-modules/kogito-codegen-processes/src/main/java/org/kie/kogito/codegen/process/ProcessResourceGenerator.java index eece2d904ad..dfbb92dfb5d 100644 --- a/kogito-codegen-modules/kogito-codegen-processes/src/main/java/org/kie/kogito/codegen/process/ProcessResourceGenerator.java +++ b/kogito-codegen-modules/kogito-codegen-processes/src/main/java/org/kie/kogito/codegen/process/ProcessResourceGenerator.java @@ -43,6 +43,7 @@ import org.kie.kogito.codegen.core.CodegenUtils; import org.kie.kogito.codegen.core.GeneratorConfig; import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess; +import org.kie.kogito.internal.utils.ConversionUtils; import com.github.javaparser.ast.CompilationUnit; import com.github.javaparser.ast.Modifier.Keyword; @@ -108,7 +109,7 @@ public ProcessResourceGenerator( this.context = context; this.process = process; this.processId = process.getId(); - this.processName = processId.substring(processId.lastIndexOf('.') + 1); + this.processName = ConversionUtils.sanitizeToSimpleName(processId); this.resourceClazzName = sanitizeClassName(processName + "Resource"); this.relativePath = process.getPackageName().replace(".", "/") + "/" + resourceClazzName + ".java"; this.modelfqcn = modelfqcn; diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java new file mode 100644 index 00000000000..5e78b3de9b9 --- /dev/null +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/config/EventsRuntimeConfig.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.events.config; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "kogito", phase = ConfigPhase.RUN_TIME, prefix = "events") +public class EventsRuntimeConfig { + + /** + * Enable publishing processes instances events + */ + @ConfigItem(name = "processinstances.enabled", defaultValue = "true") + boolean processInstancesEventsEnabled; + + /** + * Enable publishing processes definition events + */ + @ConfigItem(name = "processdefinitions.enabled", defaultValue = "true") + boolean processDefinitionEventsEnabled; + + /** + * Enable publishing user task instances events + */ + @ConfigItem(name = "usertasks.enabled", defaultValue = "true") + boolean userTasksEventsEnabled; + + public boolean isProcessInstancesEventsEnabled() { + return processInstancesEventsEnabled; + } + + public boolean isProcessDefinitionEventsEnabled() { + return processDefinitionEventsEnabled; + } + + public boolean isUserTasksEventsEnabled() { + return userTasksEventsEnabled; + } + +} diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java index 01c199deb60..225f55eb4e1 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java @@ -19,7 +19,6 @@ package org.kie.kogito.events.process; import java.util.Collection; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -28,12 +27,12 @@ import javax.inject.Inject; import javax.inject.Singleton; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Message; import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.events.config.EventsRuntimeConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +43,6 @@ @Singleton public class ReactiveMessagingEventPublisher implements EventPublisher { - private static final String PI_TOPIC_NAME = "kogito-processinstances-events"; - private static final String UI_TOPIC_NAME = "kogito-usertaskinstances-events"; private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class); @@ -53,20 +50,18 @@ public class ReactiveMessagingEventPublisher implements EventPublisher { ObjectMapper json; @Inject - @Channel(PI_TOPIC_NAME) + @Channel(PROCESS_INSTANCES_TOPIC_NAME) MutinyEmitter processInstancesEventsEmitter; @Inject - @Channel(UI_TOPIC_NAME) - MutinyEmitter userTasksEventsEmitter; + @Channel(PROCESS_DEFINITIONS_TOPIC_NAME) + MutinyEmitter processDefinitionEventsEmitter; @Inject - @ConfigProperty(name = "kogito.events.processinstances.enabled") - Optional processInstancesEvents; - + @Channel(USER_TASK_INSTANCES_TOPIC_NAME) + MutinyEmitter userTasksEventsEmitter; @Inject - @ConfigProperty(name = "kogito.events.usertasks.enabled") - Optional userTasksEvents; + EventsRuntimeConfig eventsRuntimeConfig; @Inject Instance decoratorProviderInstance; @@ -82,23 +77,29 @@ public void init() { public void publish(DataEvent event) { switch (event.getType()) { + case "ProcessDefinitionEvent": + if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) { + publishToTopic(event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); + } + break; case "ProcessInstanceErrorDataEvent": case "ProcessInstanceNodeDataEvent": case "ProcessInstanceSLADataEvent": case "ProcessInstanceStateDataEvent": case "ProcessInstanceVariableDataEvent": - if (processInstancesEvents.orElse(true)) { - publishToTopic(event, processInstancesEventsEmitter, PI_TOPIC_NAME); + if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) { + publishToTopic(event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); } break; + case "UserTaskInstanceAssignmentDataEvent": case "UserTaskInstanceAttachmentDataEvent": case "UserTaskInstanceCommentDataEvent": case "UserTaskInstanceDeadlineDataEvent": case "UserTaskInstanceStateDataEvent": case "UserTaskInstanceVariableDataEvent": - if (userTasksEvents.orElse(true)) { - publishToTopic(event, userTasksEventsEmitter, UI_TOPIC_NAME); + if (eventsRuntimeConfig.isUserTasksEventsEnabled()) { + publishToTopic(event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); } break; default: diff --git a/quarkus/addons/knative/eventing/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactory.java b/quarkus/addons/knative/eventing/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactory.java index e059c5ed205..088e353ba9c 100644 --- a/quarkus/addons/knative/eventing/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactory.java +++ b/quarkus/addons/knative/eventing/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactory.java @@ -42,6 +42,8 @@ public final class KnativeEventingConfigSourceFactory implements ConfigSourceFac private static final String PROCESS_INSTANCES_EVENTS = "kogito-processinstances-events"; + private static final String PROCESS_DEFINITIONS_EVENTS = "kogito-processdefinitions-events"; + private static final String USER_TASK_INSTANCES_EVENTS = "kogito-usertaskinstances-events"; private static final String QUARKUS_HTTP_CONNECTOR = "quarkus-http"; @@ -72,6 +74,7 @@ public Iterable getConfigSources(ConfigSourceContext context) { if (includeProcessEvents(context)) { addOutgoingConnector(configuration, PROCESS_INSTANCES_EVENTS); + addOutgoingConnector(configuration, PROCESS_DEFINITIONS_EVENTS); addOutgoingConnector(configuration, USER_TASK_INSTANCES_EVENTS); } diff --git a/quarkus/addons/knative/eventing/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactoryTest.java b/quarkus/addons/knative/eventing/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactoryTest.java index 6ae2ea213e6..120bfb650a3 100644 --- a/quarkus/addons/knative/eventing/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactoryTest.java +++ b/quarkus/addons/knative/eventing/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/eventing/KnativeEventingConfigSourceFactoryTest.java @@ -39,7 +39,7 @@ class KnativeEventingConfigSourceFactoryTest { void getConfigSourcesWithProcessEventsAndDefaultIncomeStream() { KnativeEventingConfigSource eventingConfigSource = buildKnativeEventingConfigSource("true", null); - assertThat(eventingConfigSource.getPropertyNames()).hasSize(8); + assertThat(eventingConfigSource.getPropertyNames()).hasSize(10); assertProcessEvents(eventingConfigSource); assertDefaultIncomingConnector(eventingConfigSource); assertDefaultOutgoingConnector(eventingConfigSource); @@ -49,7 +49,7 @@ void getConfigSourcesWithProcessEventsAndDefaultIncomeStream() { void getConfigSourcesWithProcessEvents() { KnativeEventingConfigSource eventingConfigSource = buildKnativeEventingConfigSource("true", "true"); - assertThat(eventingConfigSource.getPropertyNames()).hasSize(6); + assertThat(eventingConfigSource.getPropertyNames()).hasSize(8); assertProcessEvents(eventingConfigSource); assertDefaultOutgoingConnector(eventingConfigSource); } @@ -89,6 +89,9 @@ private static void assertProcessEvents(KnativeEventingConfigSource eventingConf assertContainsProperty(eventingConfigSource, "mp.messaging.outgoing.kogito-processinstances-events.connector", "quarkus-http"); assertContainsProperty(eventingConfigSource, "mp.messaging.outgoing.kogito-processinstances-events.url", DEFAULT_SINK_CONFIG); + assertContainsProperty(eventingConfigSource, "mp.messaging.outgoing.kogito-processdefinitions-events.connector", "quarkus-http"); + assertContainsProperty(eventingConfigSource, "mp.messaging.outgoing.kogito-processdefinitions-events.url", DEFAULT_SINK_CONFIG); + assertContainsProperty(eventingConfigSource, "mp.messaging.outgoing.kogito-usertaskinstances-events.connector", "quarkus-http"); assertContainsProperty(eventingConfigSource, "mp.messaging.outgoing.kogito-usertaskinstances-events.url", DEFAULT_SINK_CONFIG); diff --git a/quarkus/addons/kubernetes/integration-tests/src/main/java/org/kie/kogito/addons/quarkus/kubernetes/Foo.java b/quarkus/addons/kubernetes/integration-tests/src/main/java/org/kie/kogito/addons/quarkus/kubernetes/Foo.java index abcbda47995..b9f49ef000d 100644 --- a/quarkus/addons/kubernetes/integration-tests/src/main/java/org/kie/kogito/addons/quarkus/kubernetes/Foo.java +++ b/quarkus/addons/kubernetes/integration-tests/src/main/java/org/kie/kogito/addons/quarkus/kubernetes/Foo.java @@ -1,17 +1,20 @@ /* - * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.kie.kogito.addons.quarkus.kubernetes; diff --git a/quarkus/extensions/kogito-quarkus-extension-common/kogito-quarkus-common/src/main/java/org/kie/kogito/quarkus/registry/ProcessDefinitionRegistration.java b/quarkus/extensions/kogito-quarkus-extension-common/kogito-quarkus-common/src/main/java/org/kie/kogito/quarkus/registry/ProcessDefinitionRegistration.java new file mode 100644 index 00000000000..4ec977733a0 --- /dev/null +++ b/quarkus/extensions/kogito-quarkus-extension-common/kogito-quarkus-common/src/main/java/org/kie/kogito/quarkus/registry/ProcessDefinitionRegistration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.quarkus.registry; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; + +import org.kie.kogito.Application; +import org.kie.kogito.process.Processes; +import org.kie.kogito.quarkus.config.KogitoRuntimeConfig; +import org.kie.kogito.services.registry.ProcessDefinitionEventRegistry; + +import io.quarkus.runtime.StartupEvent; + +@ApplicationScoped +public class ProcessDefinitionRegistration { + + Instance processes; + ProcessDefinitionEventRegistry processDefinitionRegistry; + + @Inject + public ProcessDefinitionRegistration(Application application, KogitoRuntimeConfig runtimeConfig, Instance processes) { + this.processes = processes; + this.processDefinitionRegistry = new ProcessDefinitionEventRegistry(application, runtimeConfig.serviceUrl.orElse(null)); + } + + void onStartUp(@Observes StartupEvent startupEvent) { + if (processes.isResolvable()) { + processDefinitionRegistry.register(processes.get()); + } + } +} diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-deployment/src/main/java/org/kie/kogito/quarkus/serverless/workflow/deployment/ServerlessWorkflowAssetsProcessor.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-deployment/src/main/java/org/kie/kogito/quarkus/serverless/workflow/deployment/ServerlessWorkflowAssetsProcessor.java index aa9eb8ac39c..2a3241b0ad2 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-deployment/src/main/java/org/kie/kogito/quarkus/serverless/workflow/deployment/ServerlessWorkflowAssetsProcessor.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-deployment/src/main/java/org/kie/kogito/quarkus/serverless/workflow/deployment/ServerlessWorkflowAssetsProcessor.java @@ -31,6 +31,9 @@ import org.kie.kogito.codegen.api.context.KogitoBuildContext; import org.kie.kogito.codegen.process.ProcessContainerGenerator; import org.kie.kogito.codegen.process.ProcessGenerator; +import org.kie.kogito.event.process.NodeDefinition; +import org.kie.kogito.event.process.ProcessDefinitionDataEvent; +import org.kie.kogito.event.process.ProcessDefinitionEventBody; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorEventBody; import org.kie.kogito.event.process.ProcessInstanceNodeEventBody; @@ -124,7 +127,10 @@ public ReflectiveClassBuildItem eventsApiReflection() { ProcessInstanceStateDataEvent.class.getName(), ProcessInstanceStateEventBody.class.getName(), ProcessInstanceVariableDataEvent.class.getName(), - ProcessInstanceVariableEventBody.class.getName()); + ProcessInstanceVariableEventBody.class.getName(), + ProcessDefinitionDataEvent.class.getName(), + ProcessDefinitionEventBody.class.getName(), + NodeDefinition.class.getName()); } @BuildStep diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/pom.xml b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/pom.xml index 49a2036d6d0..747039445b0 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/pom.xml +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/pom.xml @@ -63,6 +63,10 @@ io.quarkus quarkus-elytron-security-properties-file + + org.kie.kogito + kogito-addons-quarkus-process-management + org.kie.kogito @@ -150,6 +154,19 @@ + + org.kie.kogito + kogito-addons-quarkus-process-management-deployment + ${project.version} + pom + test + + + * + * + + + org.kie.kogito kogito-serverless-workflow-builder diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties index 952daab5c23..ba20b794de0 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties @@ -203,6 +203,18 @@ mp.messaging.outgoing.kogito-usertaskinstances-events.value.serializer=org.apach mp.messaging.outgoing.kogito-usertaskinstances-events.group.id=kogito-data-index-it mp.messaging.outgoing.kogito-usertaskinstances-events.auto.offset.reset=latest +mp.messaging.outgoing.kogito-variables-events.connector=smallrye-kafka +mp.messaging.outgoing.kogito-variables-events.topic=kogito-variables-events +mp.messaging.outgoing.kogito-variables-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.kogito-variables-events.group.id=kogito-data-index-it +mp.messaging.outgoing.kogito-variables-events.auto.offset.reset=latest + +mp.messaging.outgoing.kogito-processdefinitions-events.connector=smallrye-kafka +mp.messaging.outgoing.kogito-processdefinitions-events.topic=kogito-processdefinitions-events +mp.messaging.outgoing.kogito-processdefinitions-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.kogito-processdefinitions-events.group.id=kogito-data-index-it +mp.messaging.outgoing.kogito-processdefinitions-events.auto.offset.reset=latest + quarkus.native.additional-build-args=-H:SerializationConfigurationResources=serialization-config.json # Maximum Java heap to be used during the native image generation quarkus.native.native-image-xmx=8g diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/WorkflowEventIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/WorkflowEventIT.java index f670e1f044b..a21b62e08ee 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/WorkflowEventIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/WorkflowEventIT.java @@ -18,20 +18,29 @@ */ package org.kie.kogito.quarkus.workflows; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.event.process.ProcessDefinitionDataEvent; +import org.kie.kogito.event.process.ProcessDefinitionEventBody; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.test.quarkus.QuarkusTestProperty; import org.kie.kogito.test.quarkus.kafka.KafkaTestClient; import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -42,6 +51,7 @@ import io.restassured.http.ContentType; import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.CoreMatchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -54,7 +64,6 @@ public class WorkflowEventIT { } private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowEventIT.class); - public static final String KOGITO_PROCESSINSTANCES_EVENTS = "kogito-processinstances-events"; public KafkaTestClient kafkaClient; private ObjectMapper mapper; @@ -77,7 +86,7 @@ void testWorkflowEvents() throws Exception { String username = "buddy"; String password = "buddy"; - kafkaClient.consume(Set.of(KOGITO_PROCESSINSTANCES_EVENTS), s -> { + kafkaClient.consume(Set.of(EventPublisher.PROCESS_INSTANCES_TOPIC_NAME), s -> { LOGGER.info("Received from kafka: {}", s); try { ProcessInstanceDataEvent event = mapper.readValue(s, ProcessInstanceDataEvent.class); @@ -109,4 +118,37 @@ void testWorkflowEvents() throws Exception { future.get(10, TimeUnit.SECONDS); } -} \ No newline at end of file + + @Test + void testWorkflowDefinitionsEvents() { + Collection definitionDataEvents = new ConcurrentLinkedQueue<>(); + kafkaClient.consume(Set.of(EventPublisher.PROCESS_DEFINITIONS_TOPIC_NAME), s -> { + LOGGER.debug("Received from kafka: {}", s); + try { + ProcessDefinitionDataEvent event = mapper.readValue(s, ProcessDefinitionDataEvent.class); + definitionDataEvents.add(event); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + + List processIds = given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .when() + .get("/management/processes") + .then() + .statusCode(200) + .extract() + .body().as(List.class); + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> assertThat(definitionDataEvents).hasSize(processIds.size())); + + List processIdsFromEvent = definitionDataEvents.stream() + .map(ProcessDefinitionDataEvent::getData) + .map(ProcessDefinitionEventBody::getId) + .collect(Collectors.toList()); + + assertThat(processIdsFromEvent).containsAll(processIds); + } +} diff --git a/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/devservices/DataIndexEventPublisher.java b/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/devservices/DataIndexEventPublisher.java index e1c7f6d9619..bf02d1fa339 100644 --- a/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/devservices/DataIndexEventPublisher.java +++ b/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/devservices/DataIndexEventPublisher.java @@ -62,6 +62,18 @@ public void publish(DataEvent event) { LOGGER.debug("Sending event to data index: {}", event); switch (event.getType()) { + case "ProcessDefinitionEvent": + webClient.postAbs(dataIndexUrl.get() + "/definitions") + .putHeader(CONTENT_TYPE, CLOUD_EVENTS_CONTENT_TYPE) + .expect(ResponsePredicate.SC_ACCEPTED) + .sendJson(event, result -> { + if (result.failed()) { + LOGGER.error("Failed to send message to Data Index", result.cause()); + } else { + LOGGER.debug("Event published to Data Index"); + } + }); + break; case "ProcessInstanceErrorDataEvent": case "ProcessInstanceNodeDataEvent": case "ProcessInstanceSLADataEvent": diff --git a/springboot/addons/events/kafka/src/main/java/org/kie/kogito/events/spring/KafkaEventPublisher.java b/springboot/addons/events/kafka/src/main/java/org/kie/kogito/events/spring/KafkaEventPublisher.java index 59301bb31e1..d11aa00b702 100644 --- a/springboot/addons/events/kafka/src/main/java/org/kie/kogito/events/spring/KafkaEventPublisher.java +++ b/springboot/addons/events/kafka/src/main/java/org/kie/kogito/events/spring/KafkaEventPublisher.java @@ -35,9 +35,6 @@ @Component public class KafkaEventPublisher implements EventPublisher { - private static final String PI_TOPIC_NAME = "kogito-processinstances-events"; - private static final String UI_TOPIC_NAME = "kogito-usertaskinstances-events"; - private static final Logger logger = LoggerFactory.getLogger(KafkaEventPublisher.class); @Autowired @@ -52,6 +49,9 @@ public class KafkaEventPublisher implements EventPublisher { @Value("${kogito.events.processinstances.enabled:true}") private boolean processInstancesEvents; + @Value("${kogito.events.processdefinitions.enabled:true}") + private boolean processDefinitionEvents; + @Value("${kogito.events.usertasks.enabled:true}") private boolean userTasksEvents; @@ -64,7 +64,7 @@ public void publish(DataEvent event) { case "ProcessInstanceSLADataEvent": case "ProcessInstanceStateDataEvent": case "ProcessInstanceVariableDataEvent": - publishToTopic(event, PI_TOPIC_NAME); + publishToTopic(event, PROCESS_INSTANCES_TOPIC_NAME); break; case "UserTaskInstanceAssignmentDataEvent": case "UserTaskInstanceAttachmentDataEvent": @@ -72,7 +72,12 @@ public void publish(DataEvent event) { case "UserTaskInstanceDeadlineDataEvent": case "UserTaskInstanceStateDataEvent": case "UserTaskInstanceVariableDataEvent": - publishToTopic(event, UI_TOPIC_NAME); + publishToTopic(event, USER_TASK_INSTANCES_TOPIC_NAME); + break; + case "ProcessDefinitionEvent": + if (processDefinitionEvents) { + publishToTopic(event, PROCESS_DEFINITIONS_TOPIC_NAME); + } break; default: logger.debug("Unknown type of event '{}', ignoring for this publisher", event.getType()); diff --git a/springboot/archetype/pom.xml b/springboot/archetype/pom.xml index cca701f3013..d34b0f8ca5b 100644 --- a/springboot/archetype/pom.xml +++ b/springboot/archetype/pom.xml @@ -52,6 +52,10 @@ ${project.version} provided + + org.kie.kogito + kogito-processes-spring-boot-starter + org.kie.kogito kogito-addons-springboot-monitoring-prometheus diff --git a/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/integration-tests-springboot-processes-persistence-common/pom.xml b/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/integration-tests-springboot-processes-persistence-common/pom.xml index d1e9dfc4aa6..f4a503316f8 100644 --- a/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/integration-tests-springboot-processes-persistence-common/pom.xml +++ b/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/integration-tests-springboot-processes-persistence-common/pom.xml @@ -37,6 +37,10 @@ org.springframework.boot spring-boot-starter-test + + org.kie.kogito + kogito-processes-spring-boot-starter + io.rest-assured rest-assured diff --git a/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/invoker.properties b/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/invoker.properties index 1e38bc2a7df..ee81a7c516d 100644 --- a/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/invoker.properties +++ b/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/invoker.properties @@ -18,5 +18,4 @@ # # disable verbose local download output -invoker.mavenOpts=-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -invoker.goals=clean install \ No newline at end of file +invoker.mavenOpts=-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn \ No newline at end of file diff --git a/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/pom.xml b/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/pom.xml index 92c8e9208f1..8e1188f1ea4 100644 --- a/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/pom.xml +++ b/springboot/integration-tests/src/it/integration-tests-springboot-processes-persistence-it/pom.xml @@ -62,6 +62,10 @@ org.kie.kogito kogito-spring-boot-starter + + org.kie.kogito + kogito-processes-spring-boot-starter + org.kie.kogito kogito-addons-springboot-process-management