diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/pom.xml b/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/pom.xml
index 242cea9e47a..b18431472d9 100644
--- a/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/pom.xml
+++ b/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/pom.xml
@@ -20,5 +20,21 @@
org.kie.kogito
kogito-serverless-workflow-runtime
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.mockito
+ mockito-inline
+ test
+
+
\ No newline at end of file
diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/main/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListener.java b/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/main/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListener.java
index c48c7fc87b3..951d0b6dbeb 100644
--- a/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/main/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListener.java
+++ b/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/main/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListener.java
@@ -1,12 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.kie.kogito.serverless.workflow.monitoring;
+import java.util.Iterator;
+
import org.kie.api.event.process.ProcessStartedEvent;
import org.kie.kogito.KogitoGAV;
import org.kie.kogito.internal.process.runtime.KogitoProcessInstance;
+import org.kie.kogito.jackson.utils.JsonObjectUtils;
import org.kie.kogito.monitoring.core.common.process.MetricsProcessEventListener;
import org.kie.kogito.serverless.workflow.SWFConstants;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.micrometer.core.instrument.MeterRegistry;
@@ -14,8 +36,19 @@
public class SonataFlowMetricProcessEventListener extends MetricsProcessEventListener {
- public SonataFlowMetricProcessEventListener(KogitoGAV gav, MeterRegistry meterRegistry) {
+ public enum ArrayStoreMode {
+ STRING,
+ JSON_STRING,
+ MULTI_PARAM
+ }
+
+ static final String INPUT_PARAMS_COUNTER_NAME = "sonataflow_input_parameters_counter";
+
+ private ArrayStoreMode arrayStoreMode;
+
+ public SonataFlowMetricProcessEventListener(KogitoGAV gav, MeterRegistry meterRegistry, ArrayStoreMode arrayStoreMode) {
super("sonataflow-process-monitoring-listener", gav, meterRegistry);
+ this.arrayStoreMode = arrayStoreMode;
}
@Override
@@ -23,28 +56,46 @@ public void beforeProcessStarted(ProcessStartedEvent event) {
final KogitoProcessInstance processInstance = (KogitoProcessInstance) event.getProcessInstance();
Object node = processInstance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR);
if (node instanceof ObjectNode) {
- registerObject(processInstance.getProcessId(), (ObjectNode) node, null);
+ registerObject(processInstance.getProcessId(), null, (ObjectNode) node);
}
-
}
- private void registerObject(String processId, ObjectNode node, String prefix) {
- node.fields().forEachRemaining(e -> registerInputParam(processId, e.getKey(), e.getValue(), prefix));
+ final void registerObject(String processId, String key, ObjectNode node) {
+ node.fields().forEachRemaining(e -> registerInputParam(processId, concat(key, e.getKey(), '.'), e.getValue()));
}
- private void registerInputParam(String processId, String key, JsonNode value, String prefix) {
- if (value.isObject()) {
- registerObject(processId, (ObjectNode) value, concat(prefix, key));
+ private void registerInputParam(String processId, String key, JsonNode value) {
+ if (value instanceof ObjectNode) {
+ registerObject(processId, key, (ObjectNode) value);
+ } else if (value instanceof ArrayNode) {
+ registerArray(processId, key, (ArrayNode) value);
} else {
- registerInputParam(processId, concat(prefix, key), value.toString());
+ registerValue(processId, key, value.asText());
}
}
- private String concat(String prefix, String key) {
- return prefix == null ? key : prefix + "." + key;
+ private void registerArray(String processId, String key, ArrayNode node) {
+ if (arrayStoreMode == ArrayStoreMode.MULTI_PARAM) {
+ Iterator iter = node.elements();
+ for (int i = 0; iter.hasNext(); i++) {
+ registerInputParam(processId, concat(key, "[" + i + "]"), iter.next());
+ }
+ } else if (arrayStoreMode == ArrayStoreMode.JSON_STRING) {
+ registerValue(processId, key, node.toString());
+ } else if (arrayStoreMode == ArrayStoreMode.STRING) {
+ registerValue(processId, key, JsonObjectUtils.toJavaValue(node).toString());
+ }
+ }
+
+ private void registerValue(String processId, String key, String value) {
+ buildCounter(INPUT_PARAMS_COUNTER_NAME, "Input parameters", processId, Tag.of("param_name", key), Tag.of("param_value", value)).increment();
}
- private void registerInputParam(String processId, String key, String value) {
- buildCounter("sonataflow_input_parameters_counter", "Input parameters", processId, Tag.of("param_name", key), Tag.of("param_value", value)).increment();
+ private String concat(String prefix, String key, char prefixChar) {
+ return prefix == null ? key : prefix + prefixChar + key;
+ }
+
+ private String concat(String prefix, String key) {
+ return prefix == null ? key : prefix + key;
}
}
diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/test/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListenerTest.java b/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/test/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListenerTest.java
new file mode 100644
index 00000000000..0763350136b
--- /dev/null
+++ b/kogito-serverless-workflow/kogito-serverless-workflow-monitoring/src/test/java/org/kie/kogito/serverless/workflow/monitoring/SonataFlowMetricProcessEventListenerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.serverless.workflow.monitoring;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.KogitoGAV;
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+import org.kie.kogito.serverless.workflow.monitoring.SonataFlowMetricProcessEventListener.ArrayStoreMode;
+import org.mockito.MockedStatic;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SonataFlowMetricProcessEventListenerTest {
+
+ private static final String PROCESS_ID = "testMetric";
+
+ private Counter counter;
+ private MeterRegistry meterRegistry;
+ private KogitoGAV kogitoGAV;
+ private Counter.Builder builder;
+ private MockedStatic factory;
+
+ @BeforeEach
+ void setup() {
+ counter = mock(Counter.class);
+ meterRegistry = mock(MeterRegistry.class);
+ builder = mock(Counter.Builder.class);
+ factory = mockStatic(Counter.class);
+ factory.when(() -> Counter.builder(SonataFlowMetricProcessEventListener.INPUT_PARAMS_COUNTER_NAME)).thenReturn(builder);
+ when(builder.register(meterRegistry)).thenReturn(counter);
+ when(builder.description(anyString())).thenReturn(builder);
+ when(builder.tag(anyString(), anyString())).thenReturn(builder);
+ kogitoGAV = new KogitoGAV("org.kogito", "test-artifact", "999-SNAPTHOT");
+ }
+
+ @AfterEach
+ void clean() {
+ factory.close();
+ }
+
+ @Test
+ void testSimpleCall() {
+ SonataFlowMetricProcessEventListener listener = new SonataFlowMetricProcessEventListener(kogitoGAV, meterRegistry, ArrayStoreMode.JSON_STRING);
+ listener.registerObject(PROCESS_ID, null, ObjectMapperFactory.get().createObjectNode().put("number", 1));
+ listener.registerObject(PROCESS_ID, null, ObjectMapperFactory.get().createObjectNode().put("number", 2));
+ verify(builder, times(2)).tag("process_id", PROCESS_ID);
+ verify(builder, times(2)).tag("param_name", "number");
+ verify(builder).tag("param_value", "1");
+ verify(builder).tag("param_value", "2");
+ verify(counter, times(2)).increment();
+ }
+
+ @Test
+ void testComplexCall() {
+ SonataFlowMetricProcessEventListener listener = new SonataFlowMetricProcessEventListener(kogitoGAV, meterRegistry, ArrayStoreMode.JSON_STRING);
+ listener.registerObject(PROCESS_ID, null,
+ ObjectMapperFactory.get().createObjectNode().set("team", ObjectMapperFactory.get().createObjectNode().put("name", "Real Betis Balompie").put("age", 117)));
+ verify(builder, times(2)).tag("process_id", PROCESS_ID);
+ verify(builder).tag("param_name", "team.name");
+ verify(builder).tag("param_value", "Real Betis Balompie");
+ verify(builder).tag("param_name", "team.age");
+ verify(builder).tag("param_value", "117");
+ verify(counter, times(2)).increment();
+ }
+
+ @Test
+ void testArrayMultiParam() {
+ SonataFlowMetricProcessEventListener listener = new SonataFlowMetricProcessEventListener(kogitoGAV, meterRegistry, ArrayStoreMode.MULTI_PARAM);
+ listener.registerObject(PROCESS_ID, null,
+ ObjectMapperFactory.get().createObjectNode().set("teams",
+ ObjectMapperFactory.get().createArrayNode().add(ObjectMapperFactory.get().createObjectNode().put("name", "Real Betis Balompie"))
+ .add(ObjectMapperFactory.get().createObjectNode().put("name", "Real Sociedad"))));
+ verify(builder, times(2)).tag("process_id", PROCESS_ID);
+ verify(builder).tag("param_name", "teams[0].name");
+ verify(builder).tag("param_value", "Real Betis Balompie");
+ verify(builder).tag("param_name", "teams[1].name");
+ verify(builder).tag("param_value", "Real Sociedad");
+ verify(counter, times(2)).increment();
+ }
+
+ @Test
+ void testArrayJsonString() {
+ SonataFlowMetricProcessEventListener listener = new SonataFlowMetricProcessEventListener(kogitoGAV, meterRegistry, ArrayStoreMode.JSON_STRING);
+ ArrayNode arrayNode = ObjectMapperFactory.get().createArrayNode().add(ObjectMapperFactory.get().createObjectNode().put("name", "Real Betis Balompie"))
+ .add(ObjectMapperFactory.get().createObjectNode().put("name", "Real Sociedad"));
+ listener.registerObject(PROCESS_ID, null,
+ ObjectMapperFactory.get().createObjectNode().set("teams", arrayNode));
+ verify(builder).tag("process_id", PROCESS_ID);
+ verify(builder).tag("param_name", "teams");
+ verify(builder).tag("param_value", arrayNode.toString());
+ verify(counter).increment();
+ }
+
+ @Test
+ void testArrayString() {
+ SonataFlowMetricProcessEventListener listener = new SonataFlowMetricProcessEventListener(kogitoGAV, meterRegistry, ArrayStoreMode.STRING);
+ ArrayNode arrayNode = ObjectMapperFactory.get().createArrayNode().add(ObjectMapperFactory.get().createObjectNode().put("name", "Real Betis Balompie"))
+ .add(ObjectMapperFactory.get().createObjectNode().put("name", "Real Sociedad"));
+ listener.registerObject(PROCESS_ID, null,
+ ObjectMapperFactory.get().createObjectNode().set("teams", arrayNode));
+ verify(builder).tag("process_id", PROCESS_ID);
+ verify(builder).tag("param_name", "teams");
+ verify(builder).tag("param_value", "[{name=Real Betis Balompie}, {name=Real Sociedad}]");
+ verify(counter).increment();
+ }
+
+}
diff --git a/quarkus/addons/monitoring/sonataflow/src/main/java/org/kie/sonataflow/monitoring/SonataFlowMetricEventListenerFactory.java b/quarkus/addons/monitoring/sonataflow/src/main/java/org/kie/sonataflow/monitoring/SonataFlowMetricEventListenerFactory.java
index de918bf3319..354eaae28c2 100644
--- a/quarkus/addons/monitoring/sonataflow/src/main/java/org/kie/sonataflow/monitoring/SonataFlowMetricEventListenerFactory.java
+++ b/quarkus/addons/monitoring/sonataflow/src/main/java/org/kie/sonataflow/monitoring/SonataFlowMetricEventListenerFactory.java
@@ -1,8 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.kie.sonataflow.monitoring;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.KogitoGAV;
import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.internal.process.event.KogitoProcessEventListener;
+import org.kie.kogito.serverless.workflow.monitoring.SonataFlowMetricProcessEventListener.ArrayStoreMode;
import org.kie.kogito.serverless.workflow.monitoring.SonataFlowMetricProcessEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,10 +41,13 @@ public class SonataFlowMetricEventListenerFactory {
@Inject
ConfigBean configBean;
+ @ConfigProperty(name = "kie.monitoring.sonataflow.arrays.store", defaultValue = "JSON_STRING")
+ ArrayStoreMode arrayStoreMode;
+
@Produces
public KogitoProcessEventListener produceProcessListener() {
LOGGER.info("Producing sonataflow listener for process monitoring.");
return new SonataFlowMetricProcessEventListener(
- configBean.getGav().orElse(KogitoGAV.EMPTY_GAV), Metrics.globalRegistry);
+ configBean.getGav().orElse(KogitoGAV.EMPTY_GAV), Metrics.globalRegistry, arrayStoreMode);
}
}