From 25bd3340f1279c403db0c48cdcaaa1c548efe90c Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Wed, 1 Nov 2023 14:19:39 -0700 Subject: [PATCH 1/7] Add framework and unit tests for DagActionStoreChangeMonitor --- .../DagActionStoreChangeMonitorTest.java | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java new file mode 100644 index 00000000000..742e997a544 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -0,0 +1,154 @@ +package org.apache.gobblin.runtime; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; +import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.orchestration.DagManager; +import org.apache.gobblin.service.modules.orchestration.Orchestrator; +import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent; +import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; +import org.apache.gobblin.service.monitoring.DagActionValue; +import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent; +import org.apache.gobblin.service.monitoring.OperationType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +/** + * Tests the main functionality of {@link DagActionStoreChangeMonitor} to process {@link DagActionStoreChangeEvent} type + * events stored in a {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The + * processMessage(DecodeableKafkaRecord message) function should be able to gracefully process a variety of message + * types, even with undesired formats, without throwing exceptions. + */ +@Slf4j +public class DagActionStoreChangeMonitorTest { + public static final String TOPIC = DagActionStoreChangeEvent.class.getSimpleName(); + private final int PARTITION = 1; + private final int OFFSET = 1; + private final String FLOW_GROUP = "flowGroup"; + private final String FLOW_NAME = "flowName"; + private final String FLOW_EXECUTION_ID = "123"; + private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor; + private int txidCounter = 0; + + class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { + + public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, + boolean isMultiActiveSchedulerEnabled) { + super(topic, config, mock(DagActionStore.class), mock(DagManager.class), numThreads, mock(FlowCatalog.class), + mock(Orchestrator.class), isMultiActiveSchedulerEnabled); + } + + @Override + protected void processMessage(DecodeableKafkaRecord record) { + super.processMessage(record); + } + + @Override + protected void startUp() { + super.startUp(); + } + } + + MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() { + Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000")) + .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer")) + .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore")) + .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121")); + return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true); + } + + @BeforeClass + public void setup() { + mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor(); + mockDagActionStoreChangeMonitor.startUp(); + } + + /** + * Ensure no NPE results from passing a HEARTBEAT type message with a null {@link DagActionValue} + */ + @Test + public void testProcessMessageWithHeartbeat() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + } + + /** + * Tests process message with an INSERT type message + */ + @Test + public void testProcessMessageWithInsert() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + } + + + /** + * Tests process message with an UPDATE type message + */ + @Test + public void testProcessMessageWithUpdate() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + } + + /** + * Tests process message with a DELETE type message + */ + @Test + public void testProcessMessageWithDelete() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + } + + /** + * Util to create a general DagActionStoreChange type event + */ + private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType, + String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) { + String key = getKeyForFlow(flowGroup, flowName, flowExecutionId); + GenericStoreChangeEvent genericStoreChangeEvent = + new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType); + txidCounter++; + return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId, dagAction); + } + + /** + * Form a key for events using the flow identifiers + * @return a key formed by adding an '_' delimiter between the flow identifiers + */ + private String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) { + return flowGroup + "_" + flowName + "_" + flowExecutionId; + } + + /** + * Util to create wrapper around DagActionStoreChangeEvent + */ + private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, String flowName, + String flowExecutionId, DagActionValue dagAction) { + DagActionStoreChangeEvent eventToProcess = null; + try { + eventToProcess = + createDagActionStoreChangeEvent(operationType, flowGroup, flowName, flowExecutionId, dagAction); + } catch (Exception e) { + log.error("Exception while creating event ", e); + } + // TODO: handle partition and offset values better + ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, + getKeyForFlow(flowGroup, flowName, flowExecutionId), eventToProcess); + return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord); + } +} \ No newline at end of file From 2ee35516db7ef9f668134c9880f560defe0434cc Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Thu, 2 Nov 2023 16:05:46 -0700 Subject: [PATCH 2/7] Add more test cases and validation --- .../DagActionStoreChangeMonitorTest.java | 76 +++++++++++++++++-- .../DagActionStoreChangeMonitor.java | 20 +++-- 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index 742e997a544..b765f51693d 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -40,6 +40,10 @@ public class DagActionStoreChangeMonitorTest { private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor; private int txidCounter = 0; + /** + * Note: The class methods accessed in the tests below are overriden to allow access to these package-protected + * methods for testing + */ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, @@ -51,12 +55,18 @@ public MockDagActionStoreChangeMonitor(String topic, Config config, int numThrea @Override protected void processMessage(DecodeableKafkaRecord record) { super.processMessage(record); + } @Override protected void startUp() { super.startUp(); } + + @Override + protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) { + super.submitFlowToDagManagerHelper(flowGroup, flowName); + } } MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() { @@ -74,44 +84,98 @@ public void setup() { } /** - * Ensure no NPE results from passing a HEARTBEAT type message with a null {@link DagActionValue} + * Ensure no NPE results from passing a HEARTBEAT type message with a null {@link DagActionValue} and the message is + * filtered out since it's a heartbeat type so no methods are called. */ @Test - public void testProcessMessageWithHeartbeat() { + public void testProcessMessageWithHeartbeatAndNullDagAction() { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null); mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + } + + /** + * Ensure a HEARTBEAT type message with non-empty flow information is filtered out since it's a heartbeat type so no + * methods are called. + */ + @Test + public void testProcessMessageWithHeartbeatAndFlowInfo() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); } /** - * Tests process message with an INSERT type message + * Tests process message with an INSERT type message of a `launch` action */ @Test - public void testProcessMessageWithInsert() { + public void testProcessMessageWithInsertLaunchType() { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(1)).submitFlowToDagManagerHelper(anyString(), anyString()); } + /** + * Tests process message with an INSERT type message of a `resume` action. It re-uses the same flow information however + * since it is a different tid used every time it will be considered unique and submit a kill request. + */ + @Test + public void testProcessMessageWithInsertResumeType() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + } + + /** + * Tests process message with an INSERT type message of a `kill` action. Similar to `testProcessMessageWithInsertResumeType`. + */ + @Test + public void testProcessMessageWithInsertKillType() { + Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = + wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL); + mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + } /** - * Tests process message with an UPDATE type message + * Tests process message with an UPDATE type message of the 'launch' action above. Although processMessage does not + * expect this message type it should handle it gracefully */ @Test public void testProcessMessageWithUpdate() { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); } /** - * Tests process message with a DELETE type message + * Tests process message with a DELETE type message which should be ignored regardless of the flow information. */ @Test public void testProcessMessageWithDelete() { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); } /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 1190a1e460b..52e3724ea59 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -17,20 +17,19 @@ package org.apache.gobblin.service.monitoring; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; - +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; - import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; @@ -43,8 +42,6 @@ import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; - - /** * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received * from its corresponding consumer client. This monitor responds to requests to resume or delete a flow and acts as a @@ -79,7 +76,8 @@ public String load(String key) throws Exception { dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader); protected DagActionStore dagActionStore; - + @Getter + @VisibleForTesting protected DagManager dagManager; protected Orchestrator orchestrator; protected boolean isMultiActiveSchedulerEnabled; From f30dcfbb39aab2a1b6c54cead1f1e3d8c4a6c82a Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Thu, 2 Nov 2023 16:46:25 -0700 Subject: [PATCH 3/7] Add header for new file --- .../DagActionStoreChangeMonitorTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index b765f51693d..cfec155801d 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gobblin.runtime; import com.typesafe.config.Config; From 64ea98f99de41ddac430468b433c7ab68bdd9bd6 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 3 Nov 2023 11:46:28 -0700 Subject: [PATCH 4/7] Move FlowSpec static function to Utils class --- .../apache/gobblin/runtime/api/FlowSpec.java | 18 +++++++++--------- .../gobblin/runtime/api/FlowSpecTest.java | 4 ++-- .../DagActionStoreChangeMonitor.java | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java index 2df547596f6..7b887f472f9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java @@ -518,15 +518,15 @@ public static int maxFlowSpecUriLength() { return URI_SCHEME.length() + ":".length() // URI separator + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH; } - } - /** - * Create a new FlowSpec object with the added property defined by path and value parameters - * @param path key for new property - * @param value - */ - public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) { - Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value)); - return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build(); + /** + * Create a new FlowSpec object with the added property defined by path and value parameters + * @param path key for new property + * @param value + */ + public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) { + Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value)); + return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build(); + } } } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java index 793abd222b6..9f7c7da8340 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java @@ -26,7 +26,7 @@ import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.gobblin.runtime.api.FlowSpec.*; +import static org.apache.gobblin.runtime.api.FlowSpec.Utils; public class FlowSpecTest { @@ -51,7 +51,7 @@ public void testAddProperty() throws URISyntaxException { properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true"); FlowSpec originalFlowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build(); - FlowSpec updatedFlowSpec = createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + FlowSpec updatedFlowSpec = FlowSpec.Utils.createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); Properties updatedProperties = updatedFlowSpec.getConfigAsProperties(); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 0f0485de5ed..c82228864c4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -44,7 +44,7 @@ import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; -import static org.apache.gobblin.runtime.api.FlowSpec; +import static org.apache.gobblin.runtime.api.FlowSpec.Utils; /** @@ -203,7 +203,7 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); spec = (FlowSpec) flowCatalog.getSpecs(flowUri); // Adds flowExecutionId to config to ensure they are consistent across hosts - FlowSpec updatedSpec = createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + FlowSpec updatedSpec = FlowSpec.Utils.createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); this.orchestrator.submitFlowToDagManager(updatedSpec); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); From a249afb028c1b83e272a9a5e0c29106a3e6173e6 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 3 Nov 2023 11:49:12 -0700 Subject: [PATCH 5/7] Remove unused import --- .../test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java | 2 -- .../gobblin/service/monitoring/DagActionStoreChangeMonitor.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java index 9f7c7da8340..6ef5ca1ccc4 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java @@ -26,8 +26,6 @@ import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.gobblin.runtime.api.FlowSpec.Utils; - public class FlowSpecTest { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index c82228864c4..3b72651d0da 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -44,8 +44,6 @@ import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; -import static org.apache.gobblin.runtime.api.FlowSpec.Utils; - /** * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received From 07b6facefe20458643cd923f90868da827f0dd11 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Mon, 6 Nov 2023 13:52:53 -0800 Subject: [PATCH 6/7] Fix compile error --- .../DagActionStoreChangeMonitorTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index cfec155801d..e78bde32915 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -81,8 +81,8 @@ protected void startUp() { } @Override - protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) { - super.submitFlowToDagManagerHelper(flowGroup, flowName); + protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) { + super.submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId); } } @@ -111,7 +111,7 @@ public void testProcessMessageWithHeartbeatAndNullDagAction() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** @@ -125,7 +125,7 @@ public void testProcessMessageWithHeartbeatAndFlowInfo() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** @@ -138,7 +138,7 @@ public void testProcessMessageWithInsertLaunchType() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(1)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(1)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** @@ -152,7 +152,7 @@ public void testProcessMessageWithInsertResumeType() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** @@ -165,7 +165,7 @@ public void testProcessMessageWithInsertKillType() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** @@ -179,7 +179,7 @@ public void testProcessMessageWithUpdate() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** @@ -192,7 +192,7 @@ public void testProcessMessageWithDelete() { mockDagActionStoreChangeMonitor.processMessage(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); } /** From e6b87d00a0d97d2b5c2da89478a76f1b712e6a56 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 7 Nov 2023 12:00:13 -0800 Subject: [PATCH 7/7] Fix unit tests --- .../DagActionStoreChangeMonitorTest.java | 84 ++++++++++--------- .../DagActionStoreChangeMonitor.java | 2 + 2 files changed, 45 insertions(+), 41 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index e78bde32915..4025cf0dbc1 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -20,11 +20,13 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import java.net.URI; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; @@ -34,7 +36,6 @@ import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent; import org.apache.gobblin.service.monitoring.OperationType; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.Mockito.*; @@ -58,8 +59,8 @@ public class DagActionStoreChangeMonitorTest { private int txidCounter = 0; /** - * Note: The class methods accessed in the tests below are overriden to allow access to these package-protected - * methods for testing + * Note: The class methods are wrapped in a test specific method because the original methods are package protected + * and cannot be accessed by this class. */ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { @@ -69,21 +70,14 @@ public MockDagActionStoreChangeMonitor(String topic, Config config, int numThrea mock(Orchestrator.class), isMultiActiveSchedulerEnabled); } - @Override - protected void processMessage(DecodeableKafkaRecord record) { + protected void processMessageForTest(DecodeableKafkaRecord record) { super.processMessage(record); } - @Override - protected void startUp() { + protected void startUpForTest() { super.startUp(); } - - @Override - protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) { - super.submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId); - } } MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() { @@ -94,10 +88,10 @@ MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() { return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true); } - @BeforeClass + // Called at start of every test so the count of each method being called is reset to 0 public void setup() { mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor(); - mockDagActionStoreChangeMonitor.startUp(); + mockDagActionStoreChangeMonitor.startUpForTest(); } /** @@ -105,94 +99,102 @@ public void setup() { * filtered out since it's a heartbeat type so no methods are called. */ @Test - public void testProcessMessageWithHeartbeatAndNullDagAction() { + public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + // Note: Indirectly verifies submitFlowToDagManagerHelper is called which is not a mocked object so cannot be asserted + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class)); } /** * Ensure a HEARTBEAT type message with non-empty flow information is filtered out since it's a heartbeat type so no * methods are called. */ - @Test - public void testProcessMessageWithHeartbeatAndFlowInfo() { + @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction") + public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class)); } /** * Tests process message with an INSERT type message of a `launch` action */ - @Test - public void testProcessMessageWithInsertLaunchType() { + @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo") + public void testProcessMessageWithInsertLaunchType() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(1)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(1)).getSpecs(any(URI.class)); } /** * Tests process message with an INSERT type message of a `resume` action. It re-uses the same flow information however * since it is a different tid used every time it will be considered unique and submit a kill request. */ - @Test - public void testProcessMessageWithInsertResumeType() { + @Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType") + public void testProcessMessageWithInsertResumeType() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class)); } /** * Tests process message with an INSERT type message of a `kill` action. Similar to `testProcessMessageWithInsertResumeType`. */ - @Test - public void testProcessMessageWithInsertKillType() { + @Test (dependsOnMethods = "testProcessMessageWithInsertResumeType") + public void testProcessMessageWithInsertKillType() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class)); } /** * Tests process message with an UPDATE type message of the 'launch' action above. Although processMessage does not * expect this message type it should handle it gracefully */ - @Test - public void testProcessMessageWithUpdate() { + @Test (dependsOnMethods = "testProcessMessageWithInsertKillType") + public void testProcessMessageWithUpdate() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class)); } /** * Tests process message with a DELETE type message which should be ignored regardless of the flow information. */ - @Test - public void testProcessMessageWithDelete() { + @Test (dependsOnMethods = "testProcessMessageWithUpdate") + public void testProcessMessageWithDelete() throws SpecNotFoundException { + setup(); Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); - mockDagActionStoreChangeMonitor.processMessage(consumerRecord); + mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); - verify(mockDagActionStoreChangeMonitor, times(0)).submitFlowToDagManagerHelper(anyString(), anyString(), anyString()); + verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class)); } /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 3b72651d0da..bbe1e47ab20 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -84,6 +84,8 @@ public String load(String key) throws Exception { protected DagManager dagManager; protected Orchestrator orchestrator; protected boolean isMultiActiveSchedulerEnabled; + @Getter + @VisibleForTesting protected FlowCatalog flowCatalog; // Note that the topic is an empty string (rather than null to avoid NPE) because this monitor relies on the consumer