Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1968] Temporal commit step integration #3829

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
*/
@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {
public final class SafeDatasetCommit implements Callable<Void> {

private static final Object GLOBAL_LOCK = new Object();

Expand Down Expand Up @@ -316,6 +316,7 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState
if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL
&& this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used
log.info("Failed task state for " + taskState.getWorkunit().getOutputFilePath());
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
Optional<String> taskStateException = taskState.getTaskFailureException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
Expand Down Expand Up @@ -68,8 +66,6 @@
@Slf4j
public class TaskStateCollectorService extends AbstractScheduledService {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);

private final JobState jobState;

private final EventBus eventBus;
Expand Down Expand Up @@ -166,13 +162,13 @@ protected Scheduler scheduler() {

@Override
protected void startUp() throws Exception {
LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
log.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
super.startUp();
}

@Override
protected void shutDown() throws Exception {
LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
try {
runOneIteration();
} finally {
Expand All @@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
@Override
public boolean apply(String input) {
return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
&& !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
}});

if (taskStateNames == null || taskStateNames.size() == 0) {
LOGGER.debug("No output task state files found in " + this.outputTaskStateDir);
final Queue<TaskState> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, this.stateSerDeRunnerThreads);
if (taskStateQueue == null) {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
return;
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) {
for (final String taskStateName : taskStateNames) {
LOGGER.debug("Found output task state file " + taskStateName);
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
taskStateQueue.add(taskState);
taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
LOGGER.warn("Could not read all task state files.");
}

LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));

// Add the TaskStates of completed tasks to the JobState so when the control
// returns to the launcher, it sees the TaskStates of all completed tasks.
for (TaskState taskState : taskStateQueue) {
Expand All @@ -241,7 +209,7 @@ public Void call() throws Exception {
// Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");

try {
optionalTaskCollectorHandler.get().handle(taskStateQueue);
Expand All @@ -259,6 +227,42 @@ public Void call() throws Exception {
this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}

public static Queue<TaskState> deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, Path outputTaskStateDir,
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
int numDeserializerThreads) throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
@Override
public boolean apply(String input) {
return input != null
&& input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
&& !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
}});

if (taskStateNames == null || taskStateNames.isEmpty()) {
log.info("No output task state files found in " + outputTaskStateDir);
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) {
for (final String taskStateName : taskStateNames) {
log.info("Found output task state file " + taskStateName);
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
taskStateQueue.add(taskState);
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
log.warn("Could not read all task state files.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not just swallow, but at least log particulars of the error

}
log.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest including outputTaskStateDir among message

return taskStateQueue;
}

/**
* Uses the size of work units to determine a job's progress and reports the progress as a percentage via
* GobblinTrackingEvents
Expand All @@ -267,15 +271,15 @@ public Void call() throws Exception {
private void reportJobProgress(TaskState taskState) {
String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE);
if (stringSize == null) {
LOGGER.warn("Expected to report job progress but work unit byte size property null");
log.warn("Expected to report job progress but work unit byte size property null");
return;
}

Long taskByteSize = Long.parseLong(stringSize);

// If progress reporting is enabled, value should be present
if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
LOGGER.warn("Expected to report job progress but total bytes to copy property null");
log.warn("Expected to report job progress but total bytes to copy property null");
return;
}
this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
Expand All @@ -287,7 +291,7 @@ private void reportJobProgress(TaskState taskState) {
this.workUnitsCompletedSoFar += 1;

if (this.totalNumWorkUnits == 0) {
LOGGER.warn("Expected to report job progress but work units are not countable");
log.warn("Expected to report job progress but work units are not countable");
return;
}
newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
Expand All @@ -307,7 +311,7 @@ private void reportJobProgress(TaskState taskState) {
Map<String, String> progress = new HashMap<>();
progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));

LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%");
log.info("Sending copy progress event with percentage " + percentageToReport + "%");
new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;


/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
@ActivityInterface
public interface CommitActivity {
@ActivityMethod
// CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`!
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
int commit(WUProcessingSpec workSpec);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.typesafe.config.ConfigFactory;
import io.temporal.failure.ApplicationFailure;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
@Slf4j
public class CommitActivityImpl implements CommitActivity {

@Override
public int commit(WUProcessingSpec workSpec) {
int numDeserializationThreads = 1;
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = createDefaultInstanceBroker(jobState.getProperties());
JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null);
// TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we use JobStateUtils.getTaskStateStorePath or is that different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but that function is tied to the MR configuration, wasn't sure if we were going to change that convention so I only read it from the WUProcessingSpec

Copy link
Contributor

@phet phet Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll stick w/ that familiar convention for now. once we're fully, 100%-capable on temporal we'll consider our options to change conventions, but during this journey, we'll maintain the status quo

...so I do recommend getTaskStateStorePath

Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri());
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
Collection<TaskState> taskStateQueue =
ImmutableList.copyOf(
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath, numDeserializationThreads));
commitTaskStates(jobState, taskStateQueue, globalGobblinContext);

} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
"Failed to commit dataset state for some dataset(s) of job <jobStub>",
IOException.class.toString(),
new IOException(e),
null
);
}

return 0;
}

protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) throws IOException {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
return HadoopUtils.getFileSystem(fsUri, stateConfig);
}

void commitTaskStates(State jobState, Collection<TaskState> taskStates, JobContext jobContext) throws IOException {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
Map<String, JobState.DatasetState> datasetStatesByUrns = createDatasetStatesByUrns(taskStates);
final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
final int numCommitThreads = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as earlier comment about instance-level and TODO


if (!shouldCommitDataInJob) {
log.info("Job will not commit data since data are committed by tasks.");
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
}

try {
if (!datasetStatesByUrns.isEmpty()) {
log.info("Persisting dataset urns.");
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
}

List<Either<Void, ExecutionException>> result = new IteratorExecutor<>(Iterables
.transform(datasetStatesByUrns.entrySet(),
new Function<Map.Entry<String, JobState.DatasetState>, Callable<Void>>() {
@Nullable
@Override
public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry) {
return new SafeDatasetCommit(shouldCommitDataInJob, false, deliverySemantics, entry.getKey(),
entry.getValue(), false, jobContext);
}
}).iterator(), numCommitThreads,
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("Commit-thread-%d")))
.executeAndGetResults();

IteratorExecutor.logFailures(result, null, 10);

if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure
throw new IOException("Failed to commit dataset state for some dataset(s) of job <jobStub>");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to include more specific info (e.g. of the task state store path)

...basically something to correlate it w/ output prior from logFailures

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's logged in the previous area IteratorExecutor.logFailures which cuts off after a set number of error logs to prevent overcrowding logs

}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}

public Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();

//TODO: handle skipped tasks?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on which ones might be skipped, so please consider elaborating in the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gobblin has a semantic for skipped tasks where workunits can be skipped but their state is saved, not sure why/when it would occur, it's not very clear and lacks documentation here. workunit.skip is the config

for (TaskState taskState : taskStates) {
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
}

return datasetStatesByUrns;
}

private String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
if (!datasetStatesByUrns.containsKey(datasetUrn)) {
JobState.DatasetState datasetState = new JobState.DatasetState();
datasetState.setDatasetUrn(datasetUrn);
datasetStatesByUrns.put(datasetUrn, datasetState);
}
return datasetUrn;
}

private static boolean shouldCommitDataInJob(State state) {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
boolean jobDataPublisherSpecified =
!Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE));
return jobCommitPolicyIsFull || publishDataAtJobLevel || jobDataPublisherSpecified;
}

private static SharedResourcesBroker<GobblinScopeTypes> createDefaultInstanceBroker(Properties jobProps) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to:

  • define in JobStateUtils
  • give a name to contrast it to getSharedResourcesBroker
  • document when one vs. the other is called for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized the other getSharedResourcesBroker should be okay here in fact, so will utilize that class method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great! this method isn't used then, is it? if not, let's remove

log.warn("Creating a job specific {}. Objects will only be shared at the job level.",
SharedResourcesBroker.class.getSimpleName());
return SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseProperties(jobProps),
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
}

}
Loading
Loading