diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java index 6e7b9b082f6..fa0b2f46ca9 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -95,7 +95,7 @@ public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException try { URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); - FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri); + FlowSpec spec = flowCatalog.getSpecs(flowUri); return FlowSpec.Utils.toFlowConfig(spec); } catch (URISyntaxException e) { throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowId.getFlowName(), e); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java index 4b77294e67e..a74b9fd0e34 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java @@ -26,6 +26,7 @@ import com.linkedin.data.template.StringMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.net.URI; import java.net.URISyntaxException; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -56,6 +58,7 @@ * */ @Alpha +@AllArgsConstructor @Data @EqualsAndHashCode(exclude={"compilationErrors"}) @SuppressFBWarnings(value="SE_BAD_FIELD", @@ -75,13 +78,23 @@ public class FlowSpec implements Configurable, Spec { /** Human-readable description of the flow spec */ final String description; - /** Flow config as a typesafe config object */ - final Config config; + /* Note that since getConfig() and getConfigAsProperties() are independent accessors, `volatile` doesn't ensure a + * consistent view between them. If one wants to access both, they should briefly synchronize on the FlowSpec object + * while obtaining them: + * FlowSpec fs = ... + * synchronized (fs) { + * fs.getConfig() + * fs.getConfigAsProperties() + * } + */ + + /** Flow config as a typesafe config object which can be replaced */ + private volatile Config config; /** Flow config as a properties collection for backwards compatibility */ // Note that this property is not strictly necessary as it can be generated from the typesafe // config. We use it as a cache until typesafe config is more widely adopted in Gobblin. - final Properties configAsProperties; + private volatile Properties configAsProperties; /** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */ final Optional> templateURIs; @@ -125,6 +138,24 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) { } } + /** + * Add property to Config (also propagated to the Properties field). These two fields should only be modified through + * this method to prevent inconsistency between them. + * Note that when the property is being added, config and configAsProperties can have different values, but they will + * be consistent by the time method returns. + * @param key + * @param value + */ + public synchronized void addProperty(String key, String value) { + this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value)); + /* Make sure configAsProperties has been initialized. If it's just initialized, setting the property will be a + redundant operation. However, if it already existed we need to update/add the key-value pair. + */ + this.getConfigAsProperties(); + this.configAsProperties.setProperty(key, value); + + } + public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) { this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops)); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 0fe6f2583e1..67c49ff3667 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -296,11 +296,11 @@ public boolean exists(URI uri) { } @Override - public Spec getSpecs(URI uri) throws SpecNotFoundException { + public FlowSpec getSpecs(URI uri) throws SpecNotFoundException { try { - return specStore.getSpec(uri); + return (FlowSpec) specStore.getSpec(uri); } catch (IOException e) { - throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e); + throw new RuntimeException("Cannot retrieve FlowSpec from FlowSpec store for URI: " + uri, e); } } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java new file mode 100644 index 00000000000..539f42d53f5 --- /dev/null +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import com.typesafe.config.Config; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.service.FlowId; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class FlowSpecTest { + + /** + * Tests that the addProperty() function to ensure the new flowSpec returned has the original properties and updated + * ones + * @throws URISyntaxException + */ + @Test + public void testAddProperty() throws URISyntaxException { + String flowGroup = "myGroup"; + String flowName = "myName"; + String flowExecutionId = "1234"; + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); + + // Create properties to be used as config + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); + properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName); + properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true"); + + FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build(); + flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + + Properties updatedProperties = flowSpec.getConfigAsProperties(); + Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); + Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); + Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName); + Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true"); + + Config updatedConfig = flowSpec.getConfig(); + Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); + Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); + Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName); + Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true"); + } +} \ No newline at end of file diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 85c8248fb75..ad99564581d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -255,7 +255,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil } else { TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED); Optional> compiledDagOptional = - this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, spec, flowGroup, + this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName); if (!compiledDagOptional.isPresent()) { @@ -264,7 +264,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil } Dag compiledDag = compiledDagOptional.get(); if (compiledDag.isEmpty()) { - FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata); + FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); Instrumented.markMeter(this.flowOrchestrationFailedMeter); sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.FAILED); @@ -288,10 +288,9 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } - public void submitFlowToDagManager(FlowSpec flowSpec, DagActionStore.DagAction flowAction) throws IOException, InterruptedException { + public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, InterruptedException { Optional> optionalJobExecutionPlanDag = - this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, - Optional.of(flowAction.getFlowExecutionId())); + this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); if (optionalJobExecutionPlanDag.isPresent()) { submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get()); } else { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index cd1bd421f7d..1253858ea2c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -642,7 +642,7 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException this.lastUpdatedTimeForFlowSpec.remove(specURI.toString()); unscheduleJob(specURI.toString()); try { - FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI); + FlowSpec spec = this.flowCatalog.get().getSpecs(specURI); Properties properties = spec.getConfigAsProperties(); _log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled Spec"); } catch (SpecNotFoundException e) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 674ff0024ca..cda69ebd9cf 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -28,7 +28,6 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; @@ -65,13 +64,10 @@ public final class FlowCompilationValidationHelper { * flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the * caller. * @param flowSpec - * @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass the ID "laundered" via the DB; - * see: {@link org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section titled - * `Database event_timestamp laundering`} * @return jobExecutionPlan dag if one can be constructed for the given flowSpec */ - public Optional> createExecutionPlanIfValid(FlowSpec flowSpec, - Optional optionalFlowExecutionId) throws IOException, InterruptedException { + public Optional> createExecutionPlanIfValid(FlowSpec flowSpec) + throws IOException, InterruptedException { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); @@ -93,7 +89,7 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS return Optional.absent(); } - addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, jobExecutionPlanDagOptional.get()); + addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); flowCompilationTimer.stop(flowMetadata); return jobExecutionPlanDagOptional; } @@ -101,25 +97,25 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS /** * Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is * already running and emits a FLOW FAILED event. Otherwise, this check passes. - * @return Optional> if caller allowed to execute flow and compile spec, else absent Optional + * @return Optional> if caller allowed to execute flow and compile flowSpec, else Optional.absent() * @throws IOException */ - public Optional> validateAndHandleConcurrentExecution(Config flowConfig, Spec spec, + public Optional> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec, String flowGroup, String flowName) throws IOException { boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled); - Dag jobExecutionPlanDag = specCompiler.compileFlow(spec); + Dag jobExecutionPlanDag = specCompiler.compileFlow(flowSpec); if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) { return Optional.fromNullable(jobExecutionPlanDag); } else { log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since " + "concurrent executions are disabled for this flow.", flowGroup, flowName); - sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, + sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec, SharedFlowMetricsSingleton.CompiledState.SKIPPED); Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter()); - if (!isScheduledFlow((FlowSpec) spec)) { + if (!flowSpec.isScheduled()) { // For ad-hoc flow, we might already increase quota, we need to decrease here for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) { quotaManager.releaseQuota(dagNode); @@ -127,9 +123,9 @@ public Optional> validateAndHandleConcurrentExecution(Conf } // Send FLOW_FAILED event - Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); + Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent " - + "executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour."); + + "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour."); new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); return Optional.absent(); } @@ -150,11 +146,11 @@ private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, St /** * Abstraction used to populate the message of and emit a FlowCompileFailed event for the Orchestrator. - * @param spec + * @param flowSpec * @param flowMetadata */ - public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter, Spec spec, - Map flowMetadata) { + public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter, + FlowSpec flowSpec, Map flowMetadata) { // For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow // compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId. // In this case, the current time is used as the flow executionId. @@ -162,8 +158,8 @@ public static void populateFlowCompilationFailedEventMessage(EventSubmitter even Long.toString(System.currentTimeMillis())); String message = "Flow was not compiled successfully."; - if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) { - message = message + " Compilation errors encountered: " + ((FlowSpec) spec).getCompilationErrors(); + if (!flowSpec.getCompilationErrors().isEmpty()) { + message = message + " Compilation errors encountered: " + flowSpec.getCompilationErrors(); } flowMetadata.put(TimingEvent.METADATA_MESSAGE, message); @@ -171,32 +167,13 @@ public static void populateFlowCompilationFailedEventMessage(EventSubmitter even } /** - * If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is - * successful, retrieve flowExecutionId from the JobSpec. + * If it is a scheduled flow run without multi-active scheduler configuration (where the FlowSpec does not have a + * flowExecutionId) and the flow compilation is successful, retrieve flowExecutionId from the JobSpec. */ public static void addFlowExecutionIdIfAbsent(Map flowMetadata, Dag jobExecutionPlanDag) { - addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), jobExecutionPlanDag); - } - - /** - * If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is - * successful, add a flowExecutionId using the optional parameter if it exists otherwise retrieve it from the JobSpec. - */ - public static void addFlowExecutionIdIfAbsent(Map flowMetadata, - Optional optionalFlowExecutionId, Dag jobExecutionPlanDag) { - if (optionalFlowExecutionId.isPresent()) { - flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, optionalFlowExecutionId.get()); - } flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty( ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); } - - /** - * Return true if the spec contains a schedule, false otherwise. - */ - public static boolean isScheduledFlow(FlowSpec spec) { - return spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY); - } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 647bb808929..a6f11e08f17 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -34,6 +34,7 @@ import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; @@ -275,9 +276,13 @@ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction, FlowSpec spec = null; try { URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); - spec = (FlowSpec) flowCatalog.getSpecs(flowUri); - // Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId - this.orchestrator.submitFlowToDagManager(spec, dagAction); + spec = flowCatalog.getSpecs(flowUri); + /* Update the spec to contain the flowExecutionId from the dagAction for scheduled flows that do not already + contain a flowExecutionId. Adhoc flowSpecs are already consistent with the dagAction so there's no effective + change. It's crucial to adopt the consensus flowExecutionId here to prevent creating a new one during compilation. + */ + spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId()); + this.orchestrator.submitFlowToDagManager(spec); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); launchSubmissionMetricProxy.markFailure(); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java index 5f778f3f8f0..37ab711f01b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java @@ -17,7 +17,6 @@ package org.apache.gobblin.service.modules.utils; -import com.google.common.base.Optional; import java.net.URISyntaxException; import java.util.HashMap; import org.apache.gobblin.metrics.event.TimingEvent; @@ -35,7 +34,6 @@ public class FlowCompilationValidationHelperTest { private String dagId = "testDag"; private Long jobSpecFlowExecutionId = 1234L; - private String newFlowExecutionId = "5678"; private String existingFlowExecutionId = "9999"; private Dag jobExecutionPlanDag; @@ -46,14 +44,13 @@ public void setup() throws URISyntaxException { } /* - Tests that addFlowExecutionIdIfAbsent adds flowExecutionId to a flowMetadata object when it is absent, prioritizing - the optional flowExecutionId over the one from the job spec + Tests that addFlowExecutionIdIfAbsent adds the jobSpec flowExecutionId to a flowMetadata object when it is absent */ @Test public void testAddFlowExecutionIdWhenAbsent() { HashMap flowMetadata = new HashMap<>(); - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.of(newFlowExecutionId), jobExecutionPlanDag); - Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), newFlowExecutionId); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); + Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), String.valueOf(jobSpecFlowExecutionId)); } /* @@ -63,7 +60,7 @@ public void testAddFlowExecutionIdWhenAbsent() { public void testSkipAddingFlowExecutionIdWhenPresent() { HashMap flowMetadata = new HashMap<>(); flowMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, existingFlowExecutionId); - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.of(newFlowExecutionId), jobExecutionPlanDag); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,jobExecutionPlanDag); Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), existingFlowExecutionId); } }