Skip to content

Commit

Permalink
Rework a few more elements of MR-related job exec for reuse in Tempor…
Browse files Browse the repository at this point in the history
…al-based execution
  • Loading branch information
phet committed Feb 21, 2024
1 parent 492fea2 commit bfdfb1b
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,21 +553,13 @@ public void apply(JobListener jobListener, JobContext jobContext)
TimingEvent workUnitsPreparationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
workUnitStream = processWorkUnitStream(workUnitStream, jobState);

// If it is a streaming source, workunits cannot be counted
this.jobContext.getJobState().setProp(NUM_WORKUNITS,
workUnitStream.isSafeToMaterialize() ? workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS), jobState);
// dump the work unit if tracking logs are enabled
if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
workUnitStream = workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
@Nullable
@Override
public WorkUnit apply(@Nullable WorkUnit input) {
LOG.info("Work unit tracking log: {}", input);
return input;
}
});
}
// dump the work unit if tracking logs are enabled (*AFTER* any materialization done for counting)
workUnitStream = addWorkUnitTrackingPerConfig(workUnitStream, jobState, LOG);

workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_PREPARATION));
Expand Down Expand Up @@ -710,13 +702,13 @@ private void executeUnfinishedCommitSequences(String jobName)
}
}

protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream, DestinationDatasetHandlerService datasetHandlerService){
protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream, DestinationDatasetHandlerService datasetHandlerService) {
return datasetHandlerService.executeHandlers(workUnitStream);
}

protected WorkUnitStream processWorkUnitStream(WorkUnitStream workUnitStream, JobState jobState) {
// Add task ids
workUnitStream = prepareWorkUnits(workUnitStream);
workUnitStream = prepareWorkUnits(workUnitStream, jobState);
// Remove skipped workUnits from the list of work units to execute.
workUnitStream = workUnitStream.filter(new SkippedWorkUnitsFilter(jobState));
// Add surviving tasks to jobState
Expand Down Expand Up @@ -836,7 +828,7 @@ protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws Exception
/**
* Materialize a {@link WorkUnitStream} into an in-memory list. Note that infinite work unit streams cannot be materialized.
*/
protected List<WorkUnit> materializeWorkUnitList(WorkUnitStream workUnitStream) {
public static List<WorkUnit> materializeWorkUnitList(WorkUnitStream workUnitStream) {
if (!workUnitStream.isFiniteStream()) {
throw new UnsupportedOperationException("Cannot materialize an infinite work unit stream.");
}
Expand Down Expand Up @@ -904,11 +896,38 @@ public void run() {
});
}

/** @return transformed `workUnits` after assigning task IDs, removing skipped ones, and registering those remaining with `jobState` */
public static WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState jobState) {
return assignIdsToWorkUnits(workUnits, jobState) // assign task ids
.filter(new SkippedWorkUnitsFilter(jobState)) // remove skipped workUnits
.transform(new MultiWorkUnitForEach() { // add remaining to jobState
@Override
public void forWorkUnit(WorkUnit workUnit) {
jobState.incrementTaskCount();
jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState)));
}
});
}

/** @return `workUnitStream` transformed to add "tracking" (sic.: actually *trace* logging), if indicated by `jobState` config */
public static WorkUnitStream addWorkUnitTrackingPerConfig(WorkUnitStream workUnitStream, JobState jobState, Logger log) {
return !jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)
? workUnitStream // no-op, unless enabled
: workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
@Nullable
@Override
public WorkUnit apply(@Nullable WorkUnit input) {
log.info("Work unit tracking log: {}", input);
return input;
}
});
}

/**
* Prepare the flattened {@link WorkUnit}s for execution by populating the job and task IDs.
*/
private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) {
return workUnits.transform(workUnitPreparator);
private static WorkUnitStream assignIdsToWorkUnits(WorkUnitStream workUnits, JobState jobState) {
return workUnits.transform(new WorkUnitPreparator(jobState.getJobId()));
}

private static abstract class MultiWorkUnitForEach implements Function<WorkUnit, WorkUnit> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ public JobState(String jobName, String jobId) {
this.setId(jobId);
}

public JobState(Properties properties) {
this(
JobState.getStateFromProps(properties),
JobState.getJobNameFromProps(properties),
JobState.getJobIdFromProps(properties));
}

public JobState(State properties, String jobName, String jobId) {
super(properties);
this.jobName = jobName;
Expand Down Expand Up @@ -191,6 +198,17 @@ public static String getJobIdFromProps(Properties props) {
: JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
}

public static State getStateFromProps(Properties props) {
return getStateFromProps(props, getJobIdFromProps(props));
}

public static State getStateFromProps(Properties props, String jobIdPropValue) {
State state = new State();
state.addAll(props);
state.setProp(ConfigurationKeys.JOB_ID_KEY, jobIdPropValue); // in case not yet directly defined as such
return state;
}

public static String getJobGroupFromState(State state) {
return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBro
// adding dependent jars/files to the DistributedCache that also updates the conf)
this.job = Job.getInstance(this.conf, JOB_NAME_PREFIX + this.jobContext.getJobName());

this.parallelRunnerThreads = Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
this.parallelRunnerThreads = ParallelRunner.readConfigNumParallelRunnerThreads(jobProps);

// StateStore interface uses the following key (rootDir, storeName, tableName)
// The state store base is the root directory and the last two elements of the path are used as the storeName and
Expand Down Expand Up @@ -682,23 +681,12 @@ private void prepareJobInput(List<WorkUnit> workUnits) throws IOException {
try {
ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads, this.fs));

int multiTaskIdSequence = 0;
JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new JobLauncherUtils.WorkUnitPathCalculator();
// Serialize each work unit into a file named after the task ID
for (WorkUnit workUnit : workUnits) {

String workUnitFileName;
if (workUnit.isMultiWorkUnit()) {
workUnitFileName = JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++)
+ JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
} else {
workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
}
Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
LOG.debug("Writing work unit file " + workUnitFileName);

Path workUnitFile = pathCalculator.calcNextPath(workUnit, this.jobContext.getJobId(), this.jobInputPath);
LOG.debug("Writing work unit file " + workUnitFile.getName());
parallelRunner.serializeToFile(workUnit, workUnitFile);

// Append the work unit file path to the job input file
}
} catch (Throwable t) {
throw closer.rethrow(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,6 +41,7 @@
import com.google.common.collect.Lists;
import com.google.common.io.Closer;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand All @@ -61,6 +64,25 @@ public class JobLauncherUtils {
// A cache for proxied FileSystems by owners
private static Cache<String, FileSystem> fileSystemCacheByOwners = CacheBuilder.newBuilder().build();

/** Calculate monotonically-increasing paths for multi-WU files */
@AllArgsConstructor
@NotThreadSafe
public static class WorkUnitPathCalculator {
private int nextMultiWorkUnitTaskId;

public WorkUnitPathCalculator() {
this(0);
}

// Serialize each work unit into a file named after the task ID
public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
String workUnitFileName = workUnit.isMultiWorkUnit()
? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId++) + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
: workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
return new Path(basePath, workUnitFileName);
}
}

/**
* Create a new job ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.util;

import java.util.Properties;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -86,6 +87,10 @@ public class ParallelRunner implements Closeable {
public static final String PARALLEL_RUNNER_THREADS_KEY = "parallel.runner.threads";
public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;

public static int readConfigNumParallelRunnerThreads(Properties props) {
return Integer.parseInt(props.getProperty(PARALLEL_RUNNER_THREADS_KEY, Integer.toString(DEFAULT_PARALLEL_RUNNER_THREADS)));
}

private final ExecutorService executor;

/**
Expand Down

0 comments on commit bfdfb1b

Please sign in to comment.