Skip to content

Commit

Permalink
Add commit step to temporal work processing workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Lo committed Nov 15, 2023
1 parent dd17bed commit 1dd0fa1
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,

if (hasCopyableFiles(datasetWorkUnitStates)) {
// Targets are always absolute, so we start moving from root (will skip any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"), true);
} else {
log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.runtime;

import com.typesafe.config.Config;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
Expand All @@ -37,12 +38,17 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitSequence;
import org.apache.gobblin.commit.CommitSequenceStore;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
Expand All @@ -61,9 +67,8 @@
* {@link DataPublisher#publish(Collection)}. This class is thread-safe if and only if the implementation of
* {@link DataPublisher} used is also thread-safe.
*/
@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {
public final class SafeDatasetCommit implements Callable<Void> {

private static final Object GLOBAL_LOCK = new Object();

Expand All @@ -76,7 +81,28 @@ final class SafeDatasetCommit implements Callable<Void> {
private final String datasetUrn;
private final JobState.DatasetState datasetState;
private final boolean isMultithreaded;
private final JobContext jobContext;
private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
private final String jobId;
private final JobCommitPolicy jobCommitPolicy;
private final State jobState;
private final Optional<CommitSequenceStore> commitSequenceStore;
private final DatasetStateStore datasetStateStore;

public SafeDatasetCommit(boolean shouldCommitDataInJob, boolean isJobCancelled, DeliverySemantics deliverySemantics,
String datasetUrn, JobState.DatasetState datasetState, boolean isMultithreaded, JobContext jobContext) {
this.shouldCommitDataInJob = shouldCommitDataInJob;
this.isJobCancelled = isJobCancelled;
this.deliverySemantics = deliverySemantics;
this.datasetUrn = datasetUrn;
this.datasetState = datasetState;
this.isMultithreaded = isMultithreaded;
this.jobBroker = jobContext.getJobBroker();
this.jobId = jobContext.getJobId();
this.jobCommitPolicy = jobContext.getJobCommitPolicy();
this.jobState = jobContext.getJobState();
this.commitSequenceStore = jobContext.getCommitSequenceStore();
this.datasetStateStore = jobContext.getDatasetStateStore();
}

private MetricContext metricContext;

Expand All @@ -92,36 +118,35 @@ public Void call()
finalizeDatasetStateBeforeCommit(this.datasetState);
Class<? extends DataPublisher> dataPublisherClass;
try (Closer closer = Closer.create()) {
dataPublisherClass = JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
dataPublisherClass = JobContext.getJobDataPublisherClass(this.jobState)
.or((Class<? extends DataPublisher>) Class.forName(ConfigurationKeys.DEFAULT_DATA_PUBLISHER_TYPE));
if (!canCommitDataset(datasetState)) {
log.warn(String
.format("Not committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn,
this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
this.jobId, this.jobCommitPolicy, this.datasetState.getState()));
checkForUnpublishedWUHandling(this.datasetUrn, this.datasetState, dataPublisherClass, closer);
throw new RuntimeException(String
.format("Not committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn,
this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
this.jobId, this.jobCommitPolicy, this.datasetState.getState()));
}
} catch (ReflectiveOperationException roe) {
log.error("Failed to instantiate data publisher for dataset {} of job {}.", this.datasetUrn,
this.jobContext.getJobId(), roe);
this.jobId, roe);
throw new RuntimeException(roe);
} finally {
maySubmitFailureEvent(datasetState);
}

if (this.isJobCancelled) {
log.info("Executing commit steps although job is cancelled due to job commit policy: " + this.jobContext
.getJobCommitPolicy());
log.info("Executing commit steps although job is cancelled due to job commit policy: " + this.jobCommitPolicy);
}

Optional<CommitSequence.Builder> commitSequenceBuilder = Optional.absent();
boolean canPersistStates = true;
try (Closer closer = Closer.create()) {
if (this.shouldCommitDataInJob) {
log.info(String.format("Committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn,
this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
this.jobId, this.jobCommitPolicy, this.datasetState.getState()));

ListMultimap<TaskFactoryWrapper, TaskState> taskStatesByFactory = groupByTaskFactory(this.datasetState);

Expand All @@ -137,8 +162,8 @@ public Void call()
DataPublisher publisher;

if (taskFactory == null) {
publisher = DataPublisherFactory.get(dataPublisherClass.getName(), this.jobContext.getJobState(),
this.jobContext.getJobBroker());
publisher = DataPublisherFactory.get(dataPublisherClass.getName(), this.jobState,
this.jobBroker);

// non-threadsafe publishers are not shareable and are not retained in the broker, so register them with
// the closer
Expand Down Expand Up @@ -177,7 +202,7 @@ public Void call()
}
} catch (Throwable throwable) {
log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn,
this.jobContext.getJobId()), throwable);
this.jobId), throwable);
throw new RuntimeException(throwable);
} finally {
try {
Expand All @@ -193,7 +218,7 @@ public Void call()

} catch (IOException | RuntimeException ioe) {
log.error(String
.format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()),
.format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobId),
ioe);
throw new RuntimeException(ioe);
}
Expand Down Expand Up @@ -301,9 +326,9 @@ private synchronized void buildAndExecuteCommitSequence(CommitSequence.Builder b
throws IOException {
CommitSequence commitSequence =
builder.addStep(buildDatasetStateCommitStep(datasetUrn, datasetState).get()).build();
this.jobContext.getCommitSequenceStore().get().put(commitSequence.getJobName(), datasetUrn, commitSequence);
this.commitSequenceStore.get().put(commitSequence.getJobName(), datasetUrn, commitSequence);
commitSequence.execute();
this.jobContext.getCommitSequenceStore().get().delete(commitSequence.getJobName(), datasetUrn);
this.commitSequenceStore.get().delete(commitSequence.getJobName(), datasetUrn);
}

/**
Expand All @@ -314,14 +339,14 @@ private synchronized void buildAndExecuteCommitSequence(CommitSequence.Builder b
private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState) {
for (TaskState taskState : datasetState.getTaskStates()) {
if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL
&& this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
&& this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used
log.info("Failed task state for " + taskState.getWorkunit().getOutputFilePath());
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
Optional<String> taskStateException = taskState.getTaskFailureException();
log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
+ (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
return;
}
}

Expand All @@ -346,9 +371,9 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState
private boolean canCommitDataset(JobState.DatasetState datasetState) {
// Only commit a dataset if 1) COMMIT_ON_PARTIAL_SUCCESS is used, or 2)
// COMMIT_ON_FULL_SUCCESS is used and all of the tasks of the dataset have succeeded.
return this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS
|| this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || (
this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS
return this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS
|| this.jobCommitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || (
this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS
&& datasetState.getState() == JobState.RunningState.SUCCESSFUL);
}

Expand All @@ -360,7 +385,7 @@ private Optional<CommitSequence.Builder> generateCommitSequenceBuilder(JobState.
.forName(datasetState
.getProp(ConfigurationKeys.DATA_PUBLISHER_TYPE, ConfigurationKeys.DEFAULT_DATA_PUBLISHER_TYPE));
CommitSequencePublisher publisher = (CommitSequencePublisher) closer
.register(DataPublisher.getInstance(dataPublisherClass, this.jobContext.getJobState()));
.register(DataPublisher.getInstance(dataPublisherClass, this.jobState));
publisher.publish(taskStates);
return publisher.getCommitSequenceBuilder();
} catch (Throwable t) {
Expand All @@ -376,9 +401,9 @@ void checkForUnpublishedWUHandling(String datasetUrn, JobState.DatasetState data
if (UnpublishedHandling.class.isAssignableFrom(dataPublisherClass)) {
// pass in jobstate to retrieve properties
DataPublisher publisher =
closer.register(DataPublisher.getInstance(dataPublisherClass, this.jobContext.getJobState()));
closer.register(DataPublisher.getInstance(dataPublisherClass, this.jobState));
log.info(String.format("Calling publisher to handle unpublished work units for dataset %s of job %s.", datasetUrn,
this.jobContext.getJobId()));
this.jobId));
((UnpublishedHandling) publisher).handleUnpublishedWorkUnits(datasetState.getTaskStatesAsWorkUnitStates());
}
}
Expand All @@ -389,7 +414,7 @@ private void finalizeDatasetState(JobState.DatasetState datasetState, String dat
// Backoff the actual high watermark to the low watermark for each task that has not been committed
if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) {
taskState.backoffActualHighWatermark();
if (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
if (this.jobCommitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// Determine the final dataset state based on the task states (post commit) and the job commit policy.
// 1. If COMMIT_ON_FULL_SUCCESS is used, the processing of the dataset is considered failed if any
// task for the dataset failed to be committed.
Expand All @@ -415,7 +440,7 @@ private void finalizeDatasetState(JobState.DatasetState datasetState, String dat
private void persistDatasetState(String datasetUrn, JobState.DatasetState datasetState)
throws IOException {
log.info("Persisting dataset state for dataset " + datasetUrn);
this.jobContext.getDatasetStateStore().persistDatasetState(datasetUrn, datasetState);
this.datasetStateStore.persistDatasetState(datasetUrn, datasetState);
}

/**
Expand Down
Loading

0 comments on commit 1dd0fa1

Please sign in to comment.