Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1968] Temporal commit step integration #3829

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(
* Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full",
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
private static boolean shouldCommitDataInJob(State state) {
public static boolean shouldCommitDataInJob(State state) {
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
*/
@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {
public final class SafeDatasetCommit implements Callable<Void> {

private static final Object GLOBAL_LOCK = new Object();

Expand Down Expand Up @@ -319,8 +319,8 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
Optional<String> taskStateException = taskState.getTaskFailureException();
log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
+ (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" ,
taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Callable;
Expand All @@ -29,10 +30,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -68,8 +66,6 @@
@Slf4j
public class TaskStateCollectorService extends AbstractScheduledService {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);

private final JobState jobState;

private final EventBus eventBus;
Expand Down Expand Up @@ -145,7 +141,7 @@ public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBu
throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe);
}
} else {
optionalTaskCollectorHandler = Optional.absent();
optionalTaskCollectorHandler = Optional.empty();
}

isJobProceedOnCollectorServiceFailure =
Expand All @@ -166,13 +162,13 @@ protected Scheduler scheduler() {

@Override
protected void startUp() throws Exception {
LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
log.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
super.startUp();
}

@Override
protected void shutDown() throws Exception {
LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
try {
runOneIteration();
} finally {
Expand All @@ -193,42 +189,14 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
@Override
public boolean apply(String input) {
return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
&& !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
}});

if (taskStateNames == null || taskStateNames.size() == 0) {
LOGGER.debug("No output task state files found in " + this.outputTaskStateDir);
final Optional<Queue<TaskState>> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), this.stateSerDeRunnerThreads);
if (!taskStateQueue.isPresent()) {
return;
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) {
for (final String taskStateName : taskStateNames) {
LOGGER.debug("Found output task state file " + taskStateName);
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
taskStateQueue.add(taskState);
taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
LOGGER.warn("Could not read all task state files.");
}

LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));

// Add the TaskStates of completed tasks to the JobState so when the control
// returns to the launcher, it sees the TaskStates of all completed tasks.
for (TaskState taskState : taskStateQueue) {
for (TaskState taskState : taskStateQueue.get()) {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
consumeTaskIssues(taskState);
taskState.setJobState(this.jobState);
this.jobState.addTaskState(taskState);
Expand All @@ -241,22 +209,68 @@ public Void call() throws Exception {
// Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.get().size() + " tasks");

try {
optionalTaskCollectorHandler.get().handle(taskStateQueue);
optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
} catch (Throwable t) {
if (isJobProceedOnCollectorServiceFailure) {
log.error("Failed to commit dataset while job proceeds", t);
SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
} else {
throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t);
}
}
}

// Notify the listeners for the completion of the tasks
this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
}

/**
* Reads in a {@link StateStore} and deserializes all task states found in the provided table name
* Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers)
* @param taskStateStore
* @param taskStateTableName
* @param numDeserializerThreads
* @return Queue of TaskStates, optional if no task states are found in the provided state store
* @throws IOException
*/
public static Optional<Queue<TaskState>> deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, String taskStateTableName,
int numDeserializerThreads) throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(taskStateTableName, new Predicate<String>() {
@Override
public boolean apply(String input) {
return input != null
&& input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
&& !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
}});

if (taskStateNames == null || taskStateNames.isEmpty()) {
log.warn("No output task state files found in " + taskStateTableName);
return Optional.empty();
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) {
for (final String taskStateName : taskStateNames) {
log.debug("Found output task state file " + taskStateName);
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0);
taskStateQueue.add(taskState);
taskStateStore.delete(taskStateTableName, taskStateName);
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
log.error("Could not read all task state files due to", ioe);
}
log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName));
return Optional.of(taskStateQueue);
}

/**
Expand All @@ -267,15 +281,15 @@ public Void call() throws Exception {
private void reportJobProgress(TaskState taskState) {
String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE);
if (stringSize == null) {
LOGGER.warn("Expected to report job progress but work unit byte size property null");
log.warn("Expected to report job progress but work unit byte size property null");
return;
}

Long taskByteSize = Long.parseLong(stringSize);

// If progress reporting is enabled, value should be present
if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
LOGGER.warn("Expected to report job progress but total bytes to copy property null");
log.warn("Expected to report job progress but total bytes to copy property null");
return;
}
this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
Expand All @@ -287,7 +301,7 @@ private void reportJobProgress(TaskState taskState) {
this.workUnitsCompletedSoFar += 1;

if (this.totalNumWorkUnits == 0) {
LOGGER.warn("Expected to report job progress but work units are not countable");
log.warn("Expected to report job progress but work units are not countable");
return;
}
newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
Expand All @@ -307,7 +321,7 @@ private void reportJobProgress(TaskState taskState) {
Map<String, String> progress = new HashMap<>();
progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));

LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%");
log.info("Sending copy progress event with percentage " + percentageToReport + "%");
new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;


/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by
* reading in a {@link WUProcessingSpec} to determine the location of the output task states */
@ActivityInterface
public interface CommitActivity {
/**
* Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
* @param workSpec
* @return number of workunits committed
*/
@ActivityMethod
int commit(WUProcessingSpec workSpec);
}
Loading
Loading