Skip to content

Commit

Permalink
Simplify a few elements of MR-related job exec before reusing code in…
Browse files Browse the repository at this point in the history
… Temporal-based execution
  • Loading branch information
phet committed Sep 21, 2023
1 parent a289613 commit d39ce8f
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ public class ConfigurationKeys {
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL = false;
public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
public static final boolean DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = false;

/**
* Configuration properties used by the distributed job launcher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {

private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class);

private static final String WORK_UNIT_FILE_EXTENSION = ".wu";

private final HelixManager helixManager;
private final TaskDriver helixTaskDriver;
private final String helixWorkFlowName;
Expand Down Expand Up @@ -539,7 +537,7 @@ private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stat
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
if (fileName.endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION)) {
stateStore = stateStores.getMwuStateStore();
} else {
stateStore = stateStores.getWuStateStore();
Expand All @@ -562,10 +560,10 @@ private String persistWorkUnit(final Path workUnitFileDir, final WorkUnit workUn
String workUnitFileName = workUnit.getId();

if (workUnit instanceof MultiWorkUnit) {
workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION;
workUnitFileName += JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getMwuStateStore();
} else {
workUnitFileName += WORK_UNIT_FILE_EXTENSION;
workUnitFileName += JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getWuStateStore();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.StateStores;
Expand Down Expand Up @@ -184,7 +183,7 @@ protected List<WorkUnit> getWorkUnits()
WorkUnit workUnit;

try {
if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
if (_workUnitFilePath.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION)) {
workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
} else {
workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ public abstract class AbstractJobLauncher implements JobLauncher {

public static final String JOB_STATE_FILE_NAME = "job.state";

public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";

public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";

public static final String NUM_WORKUNITS = "numWorkUnits";
Expand Down
123 changes: 14 additions & 109 deletions gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.Map;
import java.util.Properties;

import lombok.Getter;
import lombok.Setter;

import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.job.JobProgress;

Expand Down Expand Up @@ -127,12 +130,22 @@ public boolean isRunningOrDone() {
}
}

@Getter @Setter
private String jobName;
@Getter @Setter
private String jobId;
/** job start time in milliseconds */
@Getter @Setter
private long startTime = 0;
/** job end time in milliseconds */
@Getter @Setter
private long endTime = 0;
/** job duration in milliseconds */
@Getter @Setter
private long duration = 0;
private RunningState state = RunningState.PENDING;
/** the number of tasks this job consists of */
@Getter @Setter
private int taskCount = 0;
private final Map<String, TaskState> taskStates = Maps.newLinkedHashMap();
// Skipped task states shouldn't be exposed to publisher, but they need to be in JobState and DatasetState so that they can be written to StateStore.
Expand All @@ -149,7 +162,7 @@ public JobState(String jobName, String jobId) {
this.setId(jobId);
}

public JobState(State properties,String jobName, String jobId) {
public JobState(State properties, String jobName, String jobId) {
super(properties);
this.jobName = jobName;
this.jobId = jobId;
Expand Down Expand Up @@ -188,69 +201,6 @@ public static String getJobDescriptionFromProps(Properties props) {
return props.getProperty(ConfigurationKeys.JOB_DESCRIPTION_KEY);
}

/**
* Get job name.
*
* @return job name
*/
public String getJobName() {
return this.jobName;
}

/**
* Set job name.
*
* @param jobName job name
*/
public void setJobName(String jobName) {
this.jobName = jobName;
}

/**
* Get job ID.
*
* @return job ID
*/
public String getJobId() {
return this.jobId;
}

/**
* Set job ID.
*
* @param jobId job ID
*/
public void setJobId(String jobId) {
this.jobId = jobId;
}

/**
* Get job start time.
*
* @return job start time
*/
public long getStartTime() {
return this.startTime;
}

/**
* Set job start time.
*
* @param startTime job start time
*/
public void setStartTime(long startTime) {
this.startTime = startTime;
}

/**
* Get job end time.
*
* @return job end time
*/
public long getEndTime() {
return this.endTime;
}

/**
* Get the currently elapsed time for this job.
* @return
Expand All @@ -265,33 +215,6 @@ public long getElapsedTime() {
return 0;
}

/**
* Set job end time.
*
* @param endTime job end time
*/
public void setEndTime(long endTime) {
this.endTime = endTime;
}

/**
* Get job duration in milliseconds.
*
* @return job duration in milliseconds
*/
public long getDuration() {
return this.duration;
}

/**
* Set job duration in milliseconds.
*
* @param duration job duration in milliseconds
*/
public void setDuration(long duration) {
this.duration = duration;
}

/**
* Get job running state of type {@link RunningState}.
*
Expand All @@ -310,24 +233,6 @@ public synchronized void setState(RunningState state) {
this.state = state;
}

/**
* Get the number of tasks this job consists of.
*
* @return number of tasks this job consists of
*/
public int getTaskCount() {
return this.taskCount;
}

/**
* Set the number of tasks this job consists of.
*
* @param taskCount number of tasks this job consists of
*/
public void setTaskCount(int taskCount) {
this.taskCount = taskCount;
}

/**
* If not already present, set the {@link ConfigurationKeys#JOB_FAILURE_EXCEPTION_KEY} to a {@link String}
* representation of the given {@link Throwable}.
Expand Down
Loading

0 comments on commit d39ce8f

Please sign in to comment.