diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 55c9ebd76cd..9f0abab2404 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -63,7 +63,7 @@ */ @RequiredArgsConstructor @Slf4j -final class SafeDatasetCommit implements Callable { +public final class SafeDatasetCommit implements Callable { private static final Object GLOBAL_LOCK = new Object(); @@ -316,12 +316,12 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) { // The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used + log.info("Failed task state for " + taskState.getWorkunit().getOutputFilePath()); datasetState.setState(JobState.RunningState.FAILED); datasetState.incrementJobFailures(); Optional taskStateException = taskState.getTaskFailureException(); log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. " + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.")); - return; } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 43164638573..87a82a34b39 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -18,6 +18,7 @@ package org.apache.gobblin.runtime; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,8 +30,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Predicate; @@ -68,8 +67,6 @@ @Slf4j public class TaskStateCollectorService extends AbstractScheduledService { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class); - private final JobState jobState; private final EventBus eventBus; @@ -166,13 +163,13 @@ protected Scheduler scheduler() { @Override protected void startUp() throws Exception { - LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName()); + log.info("Starting the " + TaskStateCollectorService.class.getSimpleName()); super.startUp(); } @Override protected void shutDown() throws Exception { - LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName()); + log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName()); try { runOneIteration(); } finally { @@ -193,39 +190,11 @@ protected void shutDown() throws Exception { * @throws IOException if it fails to collect the output {@link TaskState}s */ private void collectOutputTaskStates() throws IOException { - List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { - @Override - public boolean apply(String input) { - return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX) - && !input.startsWith(FsStateStore.TMP_FILE_PREFIX); - }}); - if (taskStateNames == null || taskStateNames.size() == 0) { - LOGGER.debug("No output task state files found in " + this.outputTaskStateDir); + final Queue taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, this.stateSerDeRunnerThreads); + if (taskStateQueue == null) { return; } - - final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); - try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) { - for (final String taskStateName : taskStateNames) { - LOGGER.debug("Found output task state file " + taskStateName); - // Deserialize the TaskState and delete the file - stateSerDeRunner.submitCallable(new Callable() { - @Override - public Void call() throws Exception { - TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); - taskStateQueue.add(taskState); - taskStateStore.delete(outputTaskStateDir.getName(), taskStateName); - return null; - } - }, "Deserialize state for " + taskStateName); - } - } catch (IOException ioe) { - LOGGER.warn("Could not read all task state files."); - } - - LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size())); - // Add the TaskStates of completed tasks to the JobState so when the control // returns to the launcher, it sees the TaskStates of all completed tasks. for (TaskState taskState : taskStateQueue) { @@ -241,7 +210,7 @@ public Void call() throws Exception { // Finish any additional steps defined in handler on driver level. // Currently implemented handler for Hive registration only. if (optionalTaskCollectorHandler.isPresent()) { - LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); + log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); try { optionalTaskCollectorHandler.get().handle(taskStateQueue); @@ -259,6 +228,42 @@ public Void call() throws Exception { this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue))); } + public static Queue deserializeTaskStatesFromFolder(StateStore taskStateStore, Path outputTaskStateDir, + int numDeserializerThreads) throws IOException { + List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { + @Override + public boolean apply(String input) { + return input != null + && input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX) + && !input.startsWith(FsStateStore.TMP_FILE_PREFIX); + }}); + + if (taskStateNames == null || taskStateNames.isEmpty()) { + log.info("No output task state files found in " + outputTaskStateDir); + return null; + } + + final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); + try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) { + for (final String taskStateName : taskStateNames) { + log.info("Found output task state file " + taskStateName); + // Deserialize the TaskState and delete the file + stateSerDeRunner.submitCallable(new Callable() { + @Override + public Void call() throws Exception { + TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); + taskStateQueue.add(taskState); + return null; + } + }, "Deserialize state for " + taskStateName); + } + } catch (IOException ioe) { + log.warn("Could not read all task state files."); + } + log.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size())); + return taskStateQueue; + } + /** * Uses the size of work units to determine a job's progress and reports the progress as a percentage via * GobblinTrackingEvents @@ -267,7 +272,7 @@ public Void call() throws Exception { private void reportJobProgress(TaskState taskState) { String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE); if (stringSize == null) { - LOGGER.warn("Expected to report job progress but work unit byte size property null"); + log.warn("Expected to report job progress but work unit byte size property null"); return; } @@ -275,7 +280,7 @@ private void reportJobProgress(TaskState taskState) { // If progress reporting is enabled, value should be present if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) { - LOGGER.warn("Expected to report job progress but total bytes to copy property null"); + log.warn("Expected to report job progress but total bytes to copy property null"); return; } this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE); @@ -287,7 +292,7 @@ private void reportJobProgress(TaskState taskState) { this.workUnitsCompletedSoFar += 1; if (this.totalNumWorkUnits == 0) { - LOGGER.warn("Expected to report job progress but work units are not countable"); + log.warn("Expected to report job progress but work units are not countable"); return; } newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits; @@ -307,7 +312,7 @@ private void reportJobProgress(TaskState taskState) { Map progress = new HashMap<>(); progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport)); - LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%"); + log.info("Sending copy progress event with percentage " + percentageToReport + "%"); new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java new file mode 100644 index 00000000000..d690fafcc15 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; + + +/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */ +@ActivityInterface +public interface CommitActivity { + @ActivityMethod + // CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`! + int commit(WUProcessingSpec workSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java new file mode 100644 index 00000000000..dd9a605db1b --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.typesafe.config.ConfigFactory; +import io.temporal.failure.ApplicationFailure; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.commit.DeliverySemantics; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.FsStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobContext; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.SafeDatasetCommit; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.TaskStateCollectorService; +import org.apache.gobblin.source.extractor.JobCommitPolicy; +import org.apache.gobblin.temporal.ddm.activity.CommitActivity; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.util.Either; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.SerializationUtils; +import org.apache.gobblin.util.executors.IteratorExecutor; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +@Slf4j +public class CommitActivityImpl implements CommitActivity { + + @Override + public int commit(WUProcessingSpec workSpec) { + int numDeserializationThreads = 1; + try { + FileSystem fs = Help.loadFileSystem(workSpec); + JobState jobState = Help.loadJobState(workSpec, fs); + SharedResourcesBroker instanceBroker = createDefaultInstanceBroker(jobState.getProperties()); + JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null); + // TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits) + Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent(); + Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName()); + log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri()); + StateStore taskStateStore = Help.openTaskStateStore(workSpec, fs); + Collection taskStateQueue = + ImmutableList.copyOf( + TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath, numDeserializationThreads)); + commitTaskStates(jobState, taskStateQueue, globalGobblinContext); + + } catch (Exception e) { + //TODO: IMPROVE GRANULARITY OF RETRIES + throw ApplicationFailure.newNonRetryableFailureWithCause( + "Failed to commit dataset state for some dataset(s) of job ", + IOException.class.toString(), + new IOException(e), + null + ); + } + + return 0; + } + + protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) throws IOException { + return HadoopUtils.getFileSystem(fsUri, stateConfig); + } + + void commitTaskStates(State jobState, Collection taskStates, JobContext jobContext) throws IOException { + Map datasetStatesByUrns = createDatasetStatesByUrns(taskStates); + final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState); + final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE; + final int numCommitThreads = 1; + + if (!shouldCommitDataInJob) { + log.info("Job will not commit data since data are committed by tasks."); + } + + try { + if (!datasetStatesByUrns.isEmpty()) { + log.info("Persisting dataset urns."); + } + + List> result = new IteratorExecutor<>(Iterables + .transform(datasetStatesByUrns.entrySet(), + new Function, Callable>() { + @Nullable + @Override + public Callable apply(final Map.Entry entry) { + return new SafeDatasetCommit(shouldCommitDataInJob, false, deliverySemantics, entry.getKey(), + entry.getValue(), false, jobContext); + } + }).iterator(), numCommitThreads, + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("Commit-thread-%d"))) + .executeAndGetResults(); + + IteratorExecutor.logFailures(result, null, 10); + + if (!IteratorExecutor.verifyAllSuccessful(result)) { + // TODO: propagate cause of failure + throw new IOException("Failed to commit dataset state for some dataset(s) of job "); + } + } catch (InterruptedException exc) { + throw new IOException(exc); + } + } + + public Map createDatasetStatesByUrns(Collection taskStates) { + Map datasetStatesByUrns = Maps.newHashMap(); + + //TODO: handle skipped tasks? + for (TaskState taskState : taskStates) { + String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState); + datasetStatesByUrns.get(datasetUrn).incrementTaskCount(); + datasetStatesByUrns.get(datasetUrn).addTaskState(taskState); + } + + return datasetStatesByUrns; + } + + private String createDatasetUrn(Map datasetStatesByUrns, TaskState taskState) { + String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); + if (!datasetStatesByUrns.containsKey(datasetUrn)) { + JobState.DatasetState datasetState = new JobState.DatasetState(); + datasetState.setDatasetUrn(datasetUrn); + datasetStatesByUrns.put(datasetUrn, datasetState); + } + return datasetUrn; + } + + private static boolean shouldCommitDataInJob(State state) { + boolean jobCommitPolicyIsFull = + JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS; + boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, + ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL); + boolean jobDataPublisherSpecified = + !Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE)); + return jobCommitPolicyIsFull || publishDataAtJobLevel || jobDataPublisherSpecified; + } + + private static SharedResourcesBroker createDefaultInstanceBroker(Properties jobProps) { + log.warn("Creating a job specific {}. Objects will only be shared at the job level.", + SharedResourcesBroker.class.getSimpleName()); + return SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseProperties(jobProps), + GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + } + +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java new file mode 100644 index 00000000000..4904b356d2f --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java @@ -0,0 +1,6 @@ +package org.apache.gobblin.temporal.ddm.commit; + +public class LineageEventProduceWorkflow { + + +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java index 95425a64371..88838ca6a88 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.ddm.launcher; +import io.temporal.client.WorkflowOptions; import java.net.URI; import java.util.List; import java.util.Properties; @@ -25,7 +26,6 @@ import lombok.extern.slf4j.Slf4j; import com.typesafe.config.ConfigFactory; -import io.temporal.client.WorkflowOptions; import org.apache.hadoop.fs.Path; import org.apache.gobblin.metrics.Tag; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java index 9af6995b509..d425bbc4b15 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java @@ -24,7 +24,9 @@ import io.temporal.worker.WorkerOptions; import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl; import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; @@ -40,12 +42,12 @@ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) { @Override protected Class[] getWorkflowImplClasses() { - return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class }; + return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class }; } @Override protected Object[] getActivityImplInstances() { - return new Object[] { new ProcessWorkUnitImpl() }; + return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() }; } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java new file mode 100644 index 00000000000..4f509312cc3 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow; + +import io.temporal.workflow.Promise; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; + + +@WorkflowInterface +public interface CommitStepWorkflow { + + /** + * This is the method that is executed when the Workflow Execution is started. The Workflow + * Execution completes when this method finishes execution. + */ + @WorkflowMethod + int commit(WUProcessingSpec workSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java new file mode 100644 index 00000000000..6049546dee4 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -0,0 +1,40 @@ +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; + +import java.time.Duration; + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.temporal.ddm.activity.CommitActivity; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; + + +@Slf4j +public class CommitStepWorkflowImpl implements CommitStepWorkflow { + + private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(999)) + .setRetryOptions(ACTIVITY_RETRY_OPTS) + .build(); + + private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS); + + @Override + public int commit(WUProcessingSpec workSpec) { + Promise result = Async.function(activityStub::commit, workSpec); + return result.get(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java index 074bb460972..eaba17bf388 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java @@ -25,6 +25,7 @@ import java.time.Duration; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index eafc624096e..abe7fa3ee20 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -23,12 +23,14 @@ import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.temporal.cluster.WorkerConfig; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; @@ -36,6 +38,7 @@ import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; +@Slf4j public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits"; @@ -43,10 +46,16 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public int process(WUProcessingSpec workSpec) { Workload workload = createWorkload(workSpec); NestingExecWorkflow processingWorkflow = createProcessingWorkflow(workSpec); - return processingWorkflow.performWorkload( + int workunitsProcessed = processingWorkflow.performWorkload( WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty() ); + if (workunitsProcessed > 0) { + CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(); + int result = commitWorkflow.commit(workSpec); + return result; + } + return workunitsProcessed; } protected Workload createWorkload(WUProcessingSpec workSpec) { @@ -61,4 +70,11 @@ protected NestingExecWorkflow createProcessingWorkflow(FileS // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!? return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } + + protected CommitStepWorkflow createCommitStepWorkflow() { + ChildWorkflowOptions childOpts = + ChildWorkflowOptions.newBuilder().setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON).build(); + + return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts); + } }