Skip to content

Commit

Permalink
Fix for null pointers in TaskList returned by MapReduceFSFetcherHadoo…
Browse files Browse the repository at this point in the history
…p2 (#203)

(1) Use ArrayList instead
(2) Add unit test for this
  • Loading branch information
stiga-huang authored and akshayrai committed Feb 9, 2017
1 parent e93d431 commit dd7a458
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,27 +292,28 @@ private long[] getTaskExecTime(JobHistoryParser.TaskAttemptInfo attempInfo) {
return time;
}

private MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.TaskInfo> infoList) {
protected MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.TaskInfo> infoList) {
int sampleSize = sampleAndGetSize(jobId, infoList);

MapReduceTaskData[] taskList = new MapReduceTaskData[sampleSize];
List<MapReduceTaskData> taskList = new ArrayList<MapReduceTaskData>();
for (int i = 0; i < sampleSize; i++) {
JobHistoryParser.TaskInfo tInfo = infoList.get(i);
if (!"SUCCEEDED".equals(tInfo.getTaskStatus())) {
System.out.println("This is a failed task: " + tInfo.getTaskId().toString());
logger.info(String.format("Skipped a failed task of %s: %s", jobId, tInfo.getTaskId().toString()));
continue;
}

String taskId = tInfo.getTaskId().toString();
TaskAttemptID attemptId = tInfo.getSuccessfulAttemptId();
taskList[i] = new MapReduceTaskData(taskId, attemptId.toString());
MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId.toString());

MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());
long[] taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));

taskList[i].setTimeAndCounter(taskExecTime, taskCounterData);
taskData.setTimeAndCounter(taskExecTime, taskCounterData);
taskList.add(taskData);
}
return taskList;
return taskList.toArray(new MapReduceTaskData[taskList.size()]);
}

private class DataFiles {
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ object Dependencies {
"mysql" % "mysql-connector-java" % mysqlConnectorVersion,
"org.apache.hadoop" % "hadoop-auth" % hadoopVersion % "compileonly",
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % "compileonly",
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "compileonly",
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % "compileonly",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

import com.linkedin.drelephant.analysis.AnalyticJob;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfiguration;
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -33,6 +39,8 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

public class MapReduceFSFetcherHadoop2Test {

Expand Down Expand Up @@ -142,4 +150,64 @@ public void testGetHistoryDir() {
Assert.assertNull("Failed to initialize FileSystem", e);
}
}

@Test
public void testGetTaskData() {
FetcherConfiguration fetcherConf = new FetcherConfiguration(document9.getDocumentElement());

try {
MapReduceFSFetcherHadoop2 fetcher = new MapReduceFSFetcherHadoop2(
fetcherConf.getFetchersConfigurationData().get(0));
String jobId = "job_14000_001";
List<JobHistoryParser.TaskInfo> infoList = new ArrayList<JobHistoryParser.TaskInfo>();
infoList.add(new MockTaskInfo(1, true));
infoList.add(new MockTaskInfo(2, false));

MapReduceTaskData[] taskList = fetcher.getTaskData(jobId, infoList);
Assert.assertNotNull("taskList should not be null.", taskList);
for (MapReduceTaskData task : taskList) {
Assert.assertNotNull("Null pointer in taskList.", task);
}
Assert.assertEquals("Should have only one succeeded task.", 1, taskList.length);
} catch (IOException e) {
Assert.assertNull("Failed to initialize FileSystem.", e);
}
}

class MockTaskInfo extends JobHistoryParser.TaskInfo {
TaskID taskId;
TaskType taskType;
boolean succeeded;
Counters counters;
long startTime, finishTime;
TaskAttemptID failedDueToAttemptId;
TaskAttemptID successfulAttemptId;
Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attemptsMap;

public MockTaskInfo(int id, boolean succeeded) {
this.taskId = new TaskID("job1", 1, TaskType.MAP, id);
this.taskType = TaskType.MAP;
this.succeeded = succeeded;
this.counters = new Counters();
this.finishTime = System.currentTimeMillis();
this.startTime = finishTime - 10000;
this.failedDueToAttemptId = new TaskAttemptID(taskId, 0);
this.successfulAttemptId = new TaskAttemptID(taskId, 1);
this.attemptsMap = new HashMap<TaskAttemptID, JobHistoryParser.TaskAttemptInfo>();
this.attemptsMap.put(failedDueToAttemptId, new JobHistoryParser.TaskAttemptInfo());
this.attemptsMap.put(successfulAttemptId, new JobHistoryParser.TaskAttemptInfo());
}

public TaskID getTaskId() { return taskId; }
public long getStartTime() { return startTime; }
public long getFinishTime() { return finishTime; }
public Counters getCounters() { return counters; }
public TaskType getTaskType() { return taskType; }
public String getTaskStatus() { return succeeded ? "SUCCEEDED" : "FAILED"; }
public TaskAttemptID getSuccessfulAttemptId() { return successfulAttemptId; }
public TaskAttemptID getFailedDueToAttemptId() { return failedDueToAttemptId; }
public Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> getAllTaskAttempts() {
return attemptsMap;
}
}
}

0 comments on commit dd7a458

Please sign in to comment.