Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to use for compilation #3857

Merged
merged 7 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException

try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
FlowSpec spec = flowCatalog.getSpecs(flowUri);
return FlowSpec.Utils.toFlowConfig(spec);
} catch (URISyntaxException e) {
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowId.getFlowName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.data.template.StringMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -56,6 +58,7 @@
*
*/
@Alpha
@AllArgsConstructor
@Data
@EqualsAndHashCode(exclude={"compilationErrors"})
@SuppressFBWarnings(value="SE_BAD_FIELD",
Expand All @@ -75,13 +78,23 @@ public class FlowSpec implements Configurable, Spec {
/** Human-readable description of the flow spec */
final String description;

/** Flow config as a typesafe config object */
final Config config;
/* Note that since getConfig() and getConfigAsProperties() are independent accessors, `volatile` doesn't ensure a
* consistent view between them. If one wants to access both, they should briefly synchronize on the FlowSpec object
* while obtaining them:
* FlowSpec fs = ...
* synchronized (fs) {
Copy link
Contributor

@arjun4084346 arjun4084346 Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggesting users to syncronized over the entire FlowSpec does not look good.
Also, it looks a unnecessary usage of calling getConfig and getConfigAsProperties and compare both.
Both the fields can momentarily have different value, but that's not a problem as long as only syncronized addProperty method has write access to them.

Copy link
Contributor

@phet phet Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree it would be overkill in the general case, wherein the user is only getting one or the other of the two. there's no need for a single accessor to synchronize on the instance, given volatile.

what I believe urmi's suggesting is to lock the object if one wants to access both. this ensures both sides agree. perhaps such clarification could be added to the comment:

In a multi-threaded scenario one should use the following access mechanism:

?

it sounds overly broad as phrased above, given "a MT scenario" applies to all our code everywhere.

the other important thing to mention is that the synchronized block should only be small enough to obtain the two, so it doesn't keep the object locked.

the other alternative would be to add a combo method to this FlowSpec derived class:

/** use for consistency between the two... */
public synchronized Pair<Config, Properties> getConfigAndProperties() {
  // ultimate correctness would be to return a defensive copy of `Properties`
}

would this perhaps be clearer, to provide a ready-made capability, rather than a long comment w/ usage advice?

Copy link
Contributor

@phet phet Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, the new potential to mutate via addProperty means that getProperties() should make a defensive copy to be absolutely correct.

as a reflection, there really is a lot to worry about once classes become mutable! although I see why it may be justified here, this complexity/headache is the reason I personally strive so hard to build immutable and state-less abstractions.

(in the model, things can still change, but when they do, a new instance of a successor instance is created and the original remains safely unchanged.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the comment. I can add the combo method but it won't require access only through the combo method so I'd have to leave the comment in anyway.

* fs.getConfig()
* fs.getConfigAsProperties()
* }
*/

/** Flow config as a typesafe config object which can be replaced */
private volatile Config config;

/** Flow config as a properties collection for backwards compatibility */
// Note that this property is not strictly necessary as it can be generated from the typesafe
// config. We use it as a cache until typesafe config is more widely adopted in Gobblin.
final Properties configAsProperties;
private volatile Properties configAsProperties;

/** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
final Optional<Set<URI>> templateURIs;
Expand Down Expand Up @@ -125,6 +138,24 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) {
}
}

/**
* Add property to Config (also propagated to the Properties field). These two fields should only be modified through
* this method to prevent inconsistency between them.
* Note that when the property is being added, config and configAsProperties can have different values, but they will
* be consistent by the time method returns.
* @param key
* @param value
*/
public synchronized void addProperty(String key, String value) {
this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
/* Make sure configAsProperties has been initialized. If it's just initialized, setting the property will be a
redundant operation. However, if it already existed we need to update/add the key-value pair.
*/
this.getConfigAsProperties();
this.configAsProperties.setProperty(key, value);

}

public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) {
this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ public boolean exists(URI uri) {
}

@Override
public Spec getSpecs(URI uri) throws SpecNotFoundException {
public FlowSpec getSpecs(URI uri) throws SpecNotFoundException {
try {
return specStore.getSpec(uri);
return (FlowSpec) specStore.getSpec(uri);
} catch (IOException e) {
throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
throw new RuntimeException("Cannot retrieve FlowSpec from FlowSpec store for URI: " + uri, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime.api;

import com.typesafe.config.Config;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowId;
import org.testng.Assert;
import org.testng.annotations.Test;


public class FlowSpecTest {

/**
* Tests that the addProperty() function to ensure the new flowSpec returned has the original properties and updated
* ones
* @throws URISyntaxException
*/
@Test
public void testAddProperty() throws URISyntaxException {
String flowGroup = "myGroup";
String flowName = "myName";
String flowExecutionId = "1234";
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);

// Create properties to be used as config
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true");

FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = flowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");

Config updatedConfig = flowSpec.getConfig();
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
} else {
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, spec, flowGroup,
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName);

if (!compiledDagOptional.isPresent()) {
Expand All @@ -264,7 +264,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
}
Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
if (compiledDag.isEmpty()) {
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata);
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsSingleton.CompiledState.FAILED);
Expand All @@ -288,10 +288,9 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

public void submitFlowToDagManager(FlowSpec flowSpec, DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
Optional.of(flowAction.getFlowExecutionId()));
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
try {
FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
FlowSpec spec = this.flowCatalog.get().getSpecs(specURI);
Properties properties = spec.getConfigAsProperties();
_log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled Spec");
} catch (SpecNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
Expand Down Expand Up @@ -65,13 +64,10 @@ public final class FlowCompilationValidationHelper {
* flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the
* caller.
* @param flowSpec
* @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass the ID "laundered" via the DB;
* see: {@link org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section titled
* `Database event_timestamp laundering`}
* @return jobExecutionPlan dag if one can be constructed for the given flowSpec
*/
public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowSpec,
Optional<String> optionalFlowExecutionId) throws IOException, InterruptedException {
public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowSpec)
throws IOException, InterruptedException {
Config flowConfig = flowSpec.getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
Expand All @@ -93,43 +89,43 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
return Optional.absent();
}

addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, jobExecutionPlanDagOptional.get());
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}

/**
* Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is
* already running and emits a FLOW FAILED event. Otherwise, this check passes.
* @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile spec, else absent Optional
* @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile flowSpec, else Optional.absent()
* @throws IOException
*/
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, Spec spec,
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
String flowGroup, String flowName) throws IOException {
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled);

Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);

if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) {
return Optional.fromNullable(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
SharedFlowMetricsSingleton.CompiledState.SKIPPED);
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
if (!isScheduledFlow((FlowSpec) spec)) {
if (!flowSpec.isScheduled()) {
// For ad-hoc flow, we might already increase quota, we need to decrease here
for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
}
}

// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour.");
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return Optional.absent();
}
Expand All @@ -150,53 +146,34 @@ private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, St

/**
* Abstraction used to populate the message of and emit a FlowCompileFailed event for the Orchestrator.
* @param spec
* @param flowSpec
* @param flowMetadata
*/
public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter, Spec spec,
Map<String, String> flowMetadata) {
public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter,
FlowSpec flowSpec, Map<String, String> flowMetadata) {
// For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow
// compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId.
// In this case, the current time is used as the flow executionId.
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
Long.toString(System.currentTimeMillis()));

String message = "Flow was not compiled successfully.";
if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
message = message + " Compilation errors encountered: " + ((FlowSpec) spec).getCompilationErrors();
if (!flowSpec.getCompilationErrors().isEmpty()) {
message = message + " Compilation errors encountered: " + flowSpec.getCompilationErrors();
}
flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);

new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED).stop(flowMetadata);
}

/**
* If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is
* successful, retrieve flowExecutionId from the JobSpec.
* If it is a scheduled flow run without multi-active scheduler configuration (where the FlowSpec does not have a
* flowExecutionId) and the flow compilation is successful, retrieve flowExecutionId from the JobSpec.
*/
public static void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Dag<JobExecutionPlan> jobExecutionPlanDag) {
addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), jobExecutionPlanDag);
}

/**
* If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is
* successful, add a flowExecutionId using the optional parameter if it exists otherwise retrieve it from the JobSpec.
*/
public static void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Optional<String> optionalFlowExecutionId, Dag<JobExecutionPlan> jobExecutionPlanDag) {
if (optionalFlowExecutionId.isPresent()) {
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, optionalFlowExecutionId.get());
}
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
}

/**
* Return true if the spec contains a schedule, false otherwise.
*/
public static boolean isScheduledFlow(FlowSpec spec) {
return spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
Expand Down Expand Up @@ -275,9 +276,13 @@ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId
this.orchestrator.submitFlowToDagManager(spec, dagAction);
spec = flowCatalog.getSpecs(flowUri);
/* Update the spec to contain the flowExecutionId from the dagAction for scheduled flows that do not already
contain a flowExecutionId. Adhoc flowSpecs are already consistent with the dagAction so there's no effective
change. It's crucial to adopt the consensus flowExecutionId here to prevent creating a new one during compilation.
*/
spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId());
this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
launchSubmissionMetricProxy.markFailure();
Expand Down
Loading
Loading