Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1968] Temporal commit step integration #3829

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(
* Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full",
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
private static boolean shouldCommitDataInJob(State state) {
public static boolean shouldCommitDataInJob(State state) {
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,11 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState
if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL
&& this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used
log.info("Failed task state for " + taskState.getWorkunit().getOutputFilePath());
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
Optional<String> taskStateException = taskState.getTaskFailureException();
log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
+ (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" ,
taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ private void collectOutputTaskStates() throws IOException {
this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}

/**
* Reads in a {@link FsStateStore} folder used to store Task state outputs, and returns a queue of {@link TaskState}s
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
* Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers)
* @param taskStateStore
* @param outputTaskStateDir
* @param numDeserializerThreads
* @return Queue of TaskStates
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
* @throws IOException
*/
public static Queue<TaskState> deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path outputTaskStateDir,
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
int numDeserializerThreads) throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
Expand All @@ -238,28 +247,29 @@ public boolean apply(String input) {
}});

if (taskStateNames == null || taskStateNames.isEmpty()) {
log.info("No output task state files found in " + outputTaskStateDir);
log.warn("No output task state files found in " + outputTaskStateDir);
return null;
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) {
for (final String taskStateName : taskStateNames) {
log.info("Found output task state file " + taskStateName);
log.debug("Found output task state file " + taskStateName);
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
taskStateQueue.add(taskState);
taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
log.warn("Could not read all task state files.");
log.error("Could not read all task state files due to", ioe);
}
log.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), outputTaskStateDir));
return taskStateQueue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;


/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by
* reading in a {@link WUProcessingSpec} to determine the location of the output task states */
@ActivityInterface
public interface CommitActivity {
/**
* Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
* @param workSpec
* @return number of workunits committed
*/
@ActivityMethod
// CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`!
int commit(WUProcessingSpec workSpec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.typesafe.config.ConfigFactory;
import io.temporal.failure.ApplicationFailure;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -41,23 +39,18 @@
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -66,13 +59,16 @@
@Slf4j
public class CommitActivityImpl implements CommitActivity {

static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
static int DEFAULT_NUM_COMMIT_THREADS = 1;
@Override
public int commit(WUProcessingSpec workSpec) {
int numDeserializationThreads = 1;
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = createDefaultInstanceBroker(jobState.getProperties());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null);
// TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
Expand All @@ -83,7 +79,7 @@ public int commit(WUProcessingSpec workSpec) {
ImmutableList.copyOf(
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath, numDeserializationThreads));
commitTaskStates(jobState, taskStateQueue, globalGobblinContext);

return taskStateQueue.size();
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
Expand All @@ -93,27 +89,28 @@ public int commit(WUProcessingSpec workSpec) {
null
);
}

return 0;
}

protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) throws IOException {
return HadoopUtils.getFileSystem(fsUri, stateConfig);
}

/**
* Commit task states to the dataset state store.
* @param jobState
* @param taskStates
* @param jobContext
* @throws IOException
*/
void commitTaskStates(State jobState, Collection<TaskState> taskStates, JobContext jobContext) throws IOException {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
Map<String, JobState.DatasetState> datasetStatesByUrns = createDatasetStatesByUrns(taskStates);
final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
final int numCommitThreads = 1;

//TODO: Make this configurable
final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
if (!shouldCommitDataInJob) {
log.info("Job will not commit data since data are committed by tasks.");
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
}

try {
if (!datasetStatesByUrns.isEmpty()) {
log.info("Persisting dataset urns.");
log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add info about the dataset otherwise this cannot be connected to a particular job

}

List<Either<Void, ExecutionException>> result = new IteratorExecutor<>(Iterables
Expand All @@ -133,14 +130,20 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry

if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure
throw new IOException("Failed to commit dataset state for some dataset(s) of job <jobStub>");
String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, "<job_name_stub>");
throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}

public Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
/**
* Organize task states by dataset urns.
* @param taskStates
* @return
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
*/
public static Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();

//TODO: handle skipped tasks?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on which ones might be skipped, so please consider elaborating in the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gobblin has a semantic for skipped tasks where workunits can be skipped but their state is saved, not sure why/when it would occur, it's not very clear and lacks documentation here. workunit.skip is the config

Expand All @@ -153,7 +156,7 @@ public Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<T
return datasetStatesByUrns;
}

private String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
private static String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
if (!datasetStatesByUrns.containsKey(datasetUrn)) {
JobState.DatasetState datasetState = new JobState.DatasetState();
Expand All @@ -163,16 +166,6 @@ private String createDatasetUrn(Map<String, JobState.DatasetState> datasetStates
return datasetUrn;
}

private static boolean shouldCommitDataInJob(State state) {
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
boolean jobDataPublisherSpecified =
!Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE));
return jobCommitPolicyIsFull || publishDataAtJobLevel || jobDataPublisherSpecified;
}

private static SharedResourcesBroker<GobblinScopeTypes> createDefaultInstanceBroker(Properties jobProps) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to:

  • define in JobStateUtils
  • give a name to contrast it to getSharedResourcesBroker
  • document when one vs. the other is called for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized the other getSharedResourcesBroker should be okay here in fact, so will utilize that class method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great! this method isn't used then, is it? if not, let's remove

log.warn("Creating a job specific {}. Objects will only be shared at the job level.",
SharedResourcesBroker.class.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@

package org.apache.gobblin.temporal.ddm.workflow;

import io.temporal.workflow.Promise;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;


/**
* Workflow for committing the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
*/
@WorkflowInterface
public interface CommitStepWorkflow {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved

/**
* This is the method that is executed when the Workflow Execution is started. The Workflow
* Execution completes when this method finishes execution.
* Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
* Returns the number of workunits committed
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
*/
@WorkflowMethod
int commit(WUProcessingSpec workSpec);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.workflow.impl;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;

import java.time.Duration;

import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;


Expand All @@ -34,7 +48,6 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {

@Override
public int commit(WUProcessingSpec workSpec) {
Promise<Integer> result = Async.function(activityStub::commit, workSpec);
return result.get();
return activityStub.commit(workSpec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.time.Duration;

import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
@Slf4j
public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow";

@Override
public int process(WUProcessingSpec workSpec) {
Expand Down Expand Up @@ -72,8 +73,10 @@ protected NestingExecWorkflow<WorkUnitClaimCheck> createProcessingWorkflow(FileS
}

protected CommitStepWorkflow createCommitStepWorkflow() {
ChildWorkflowOptions childOpts =
ChildWorkflowOptions.newBuilder().setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON).build();
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE, WorkerConfig.of(this).orElse(ConfigFactory.empty())))
.build();

return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
}
Expand Down
Loading