Skip to content

Commit

Permalink
add s3, s3a, s3n bytes read and bytes written, and update heuristics …
Browse files Browse the repository at this point in the history
…to use them (linkedin#254)
  • Loading branch information
georgewu2 authored and akshayrai committed Jun 21, 2017
1 parent b387418 commit 7d4936b
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public static enum CounterName {
FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", "FILE_BYTES_WRITTEN"),
HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", "HDFS_BYTES_READ"),
HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", "HDFS_BYTES_WRITTEN"),
S3_BYTES_READ(GroupName.FileSystemCounters, "S3_BYTES_READ", "S3_BYTES_READ"),
S3_BYTES_WRITTEN(GroupName.FileSystemCounters, "S3_BYTES_WRITTEN", "S3_BYTES_WRITTEN"),
S3N_BYTES_READ(GroupName.FileSystemCounters, "S3N_BYTES_READ", "S3N_BYTES_READ"),
S3N_BYTES_WRITTEN(GroupName.FileSystemCounters, "S3N_BYTES_WRITTEN", "S3N_BYTES_WRITTEN"),
S3A_BYTES_READ(GroupName.FileSystemCounters, "S3A_BYTES_READ", "S3A_BYTES_READ"),
S3A_BYTES_WRITTEN(GroupName.FileSystemCounters, "S3A_BYTES_WRITTEN", "S3A_BYTES_WRITTEN"),

MAP_INPUT_RECORDS(GroupName.MapReduce, "MAP_INPUT_RECORDS", "Map input records"),
MAP_OUTPUT_RECORDS(GroupName.MapReduce, "MAP_OUTPUT_RECORDS", "Map output records"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class GenericDataSkewHeuristic implements Heuristic<MapReduceApp
private double[] deviationLimits = {2, 4, 8, 16}; // Deviation in i/p bytes btw 2 groups
private double[] filesLimits = {1d/8, 1d/4, 1d/2, 1d}; // Fraction of HDFS Block Size

private MapReduceCounterData.CounterName _counterName;
private List<MapReduceCounterData.CounterName> _counterNames;
private HeuristicConfigurationData _heuristicConfData;

private void loadParameters() {
Expand Down Expand Up @@ -85,9 +85,9 @@ private void loadParameters() {
}
}

protected GenericDataSkewHeuristic(MapReduceCounterData.CounterName counterName,
HeuristicConfigurationData heuristicConfData) {
this._counterName = counterName;
protected GenericDataSkewHeuristic(List<MapReduceCounterData.CounterName> counterNames,
HeuristicConfigurationData heuristicConfData) {
this._counterNames = counterNames;
this._heuristicConfData = heuristicConfData;

loadParameters();
Expand All @@ -114,7 +114,11 @@ public HeuristicResult apply(MapReduceApplicationData data) {

for (int i = 0; i < tasks.length; i++) {
if (tasks[i].isCounterDataPresent()) {
inputBytes.add(tasks[i].getCounters().get(_counterName));
long inputByte = 0;
for (MapReduceCounterData.CounterName counterName: _counterNames) {
inputByte += tasks[i].getCounters().get(counterName);
}
inputBytes.add(inputByte);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;

import java.util.Arrays;


/**
* This Heuristic analyses the skewness in the mapper input data
*/
public class MapperDataSkewHeuristic extends GenericDataSkewHeuristic {

public MapperDataSkewHeuristic(HeuristicConfigurationData heuristicConfData) {
super(MapReduceCounterData.CounterName.HDFS_BYTES_READ, heuristicConfData);
super(Arrays.asList(
MapReduceCounterData.CounterName.HDFS_BYTES_READ,
MapReduceCounterData.CounterName.S3_BYTES_READ,
MapReduceCounterData.CounterName.S3A_BYTES_READ,
MapReduceCounterData.CounterName.S3N_BYTES_READ
), heuristicConfData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public class MapperSpeedHeuristic implements Heuristic<MapReduceApplicationData>
private double[] diskSpeedLimits = {1d/2, 1d/4, 1d/8, 1d/32}; // Fraction of HDFS block size
private double[] runtimeLimits = {5, 10, 15, 30}; // The Map task runtime in milli sec

private List<MapReduceCounterData.CounterName> _counterNames = Arrays.asList(
MapReduceCounterData.CounterName.HDFS_BYTES_READ,
MapReduceCounterData.CounterName.S3_BYTES_READ,
MapReduceCounterData.CounterName.S3A_BYTES_READ,
MapReduceCounterData.CounterName.S3N_BYTES_READ
);

private HeuristicConfigurationData _heuristicConfData;

private void loadParameters() {
Expand Down Expand Up @@ -101,7 +108,10 @@ public HeuristicResult apply(MapReduceApplicationData data) {
for (MapReduceTaskData task : tasks) {

if (task.isTimeAndCounterDataPresent()) {
long inputBytes = task.getCounters().get(MapReduceCounterData.CounterName.HDFS_BYTES_READ);
long inputBytes = 0;
for (MapReduceCounterData.CounterName counterName: _counterNames) {
inputBytes += task.getCounters().get(counterName);
}
long runtimeMs = task.getTotalRunTimeMs();
inputByteSizes.add(inputBytes);
runtimesMs.add(runtimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public class MapperTimeHeuristic implements Heuristic<MapReduceApplicationData>
private double[] longRuntimeLimits = {15, 30, 60, 120}; // Limits(ms) for tasks with longer runtime
private double[] numTasksLimits = {50, 101, 500, 1000}; // Number of Map tasks.

private List<MapReduceCounterData.CounterName> _counterNames = Arrays.asList(
MapReduceCounterData.CounterName.HDFS_BYTES_READ,
MapReduceCounterData.CounterName.S3_BYTES_READ,
MapReduceCounterData.CounterName.S3A_BYTES_READ,
MapReduceCounterData.CounterName.S3N_BYTES_READ
);

private HeuristicConfigurationData _heuristicConfData;

private void loadParameters() {
Expand Down Expand Up @@ -109,7 +116,11 @@ public HeuristicResult apply(MapReduceApplicationData data) {
for (MapReduceTaskData task : tasks) {

if (task.isTimeAndCounterDataPresent()) {
inputBytes.add(task.getCounters().get(MapReduceCounterData.CounterName.HDFS_BYTES_READ));
long inputByte = 0;
for (MapReduceCounterData.CounterName counterName: _counterNames) {
inputByte += task.getCounters().get(counterName);
}
inputBytes.add(inputByte);
long taskTime = task.getTotalRunTimeMs();
runtimesMs.add(taskTime);
taskMinMs = Math.min(taskMinMs, taskTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;

import java.util.Arrays;



/**
* This Heuristic analyses the skewness in the reducer input data
*/
public class ReducerDataSkewHeuristic extends GenericDataSkewHeuristic {

public ReducerDataSkewHeuristic(HeuristicConfigurationData heuristicConfData) {
super(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES, heuristicConfData);
super(Arrays.asList(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES), heuristicConfData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu
smallCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, smallInputSize);

MapReduceCounterData largeCounter = new MapReduceCounterData();
largeCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, largeInputSize);
largeCounter.set(MapReduceCounterData.CounterName.S3A_BYTES_READ, largeInputSize);

int i = 0;
for (; i < numSmallTasks; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException {
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, readBytes);
counter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, readBytes / 4);
counter.set(MapReduceCounterData.CounterName.S3_BYTES_READ, readBytes / 4);
counter.set(MapReduceCounterData.CounterName.S3A_BYTES_READ, readBytes / 4);
counter.set(MapReduceCounterData.CounterName.S3N_BYTES_READ, readBytes / 4);

int i = 0;
for (; i < NUMTASKS; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ private Severity analyzeJob(int numTasks, long runtime) throws IOException {
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks + 1];

MapReduceCounterData taskCounter = new MapReduceCounterData();
taskCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, DUMMY_INPUT_SIZE);
taskCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, DUMMY_INPUT_SIZE / 4);
taskCounter.set(MapReduceCounterData.CounterName.S3_BYTES_READ, DUMMY_INPUT_SIZE / 4);
taskCounter.set(MapReduceCounterData.CounterName.S3A_BYTES_READ, DUMMY_INPUT_SIZE / 4);
taskCounter.set(MapReduceCounterData.CounterName.S3N_BYTES_READ, DUMMY_INPUT_SIZE / 4);

int i = 0;
for (; i < numTasks; i++) {
Expand Down

0 comments on commit 7d4936b

Please sign in to comment.