From 7d4936b8dec853bd22926eb0ebb8329cd0b0efbb Mon Sep 17 00:00:00 2001 From: georgewu2 Date: Tue, 20 Jun 2017 20:29:34 -0700 Subject: [PATCH] add s3, s3a, s3n bytes read and bytes written, and update heuristics to use them (#254) --- .../mapreduce/data/MapReduceCounterData.java | 6 ++++++ .../heuristics/GenericDataSkewHeuristic.java | 14 +++++++++----- .../heuristics/MapperDataSkewHeuristic.java | 9 ++++++++- .../mapreduce/heuristics/MapperSpeedHeuristic.java | 12 +++++++++++- .../mapreduce/heuristics/MapperTimeHeuristic.java | 13 ++++++++++++- .../heuristics/ReducerDataSkewHeuristic.java | 5 ++++- .../heuristics/MapperDataSkewHeuristicTest.java | 2 +- .../heuristics/MapperSpeedHeuristicTest.java | 5 ++++- .../heuristics/MapperTimeHeuristicTest.java | 5 ++++- 9 files changed, 59 insertions(+), 12 deletions(-) diff --git a/app/com/linkedin/drelephant/mapreduce/data/MapReduceCounterData.java b/app/com/linkedin/drelephant/mapreduce/data/MapReduceCounterData.java index 36d9f11d0..ae121ef19 100644 --- a/app/com/linkedin/drelephant/mapreduce/data/MapReduceCounterData.java +++ b/app/com/linkedin/drelephant/mapreduce/data/MapReduceCounterData.java @@ -109,6 +109,12 @@ public static enum CounterName { FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", "FILE_BYTES_WRITTEN"), HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", "HDFS_BYTES_READ"), HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", "HDFS_BYTES_WRITTEN"), + S3_BYTES_READ(GroupName.FileSystemCounters, "S3_BYTES_READ", "S3_BYTES_READ"), + S3_BYTES_WRITTEN(GroupName.FileSystemCounters, "S3_BYTES_WRITTEN", "S3_BYTES_WRITTEN"), + S3N_BYTES_READ(GroupName.FileSystemCounters, "S3N_BYTES_READ", "S3N_BYTES_READ"), + S3N_BYTES_WRITTEN(GroupName.FileSystemCounters, "S3N_BYTES_WRITTEN", "S3N_BYTES_WRITTEN"), + S3A_BYTES_READ(GroupName.FileSystemCounters, "S3A_BYTES_READ", "S3A_BYTES_READ"), + S3A_BYTES_WRITTEN(GroupName.FileSystemCounters, "S3A_BYTES_WRITTEN", "S3A_BYTES_WRITTEN"), MAP_INPUT_RECORDS(GroupName.MapReduce, "MAP_INPUT_RECORDS", "Map input records"), MAP_OUTPUT_RECORDS(GroupName.MapReduce, "MAP_OUTPUT_RECORDS", "Map output records"), diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java index 03d65416f..e975760ef 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java @@ -53,7 +53,7 @@ public abstract class GenericDataSkewHeuristic implements Heuristic _counterNames; private HeuristicConfigurationData _heuristicConfData; private void loadParameters() { @@ -85,9 +85,9 @@ private void loadParameters() { } } - protected GenericDataSkewHeuristic(MapReduceCounterData.CounterName counterName, - HeuristicConfigurationData heuristicConfData) { - this._counterName = counterName; + protected GenericDataSkewHeuristic(List counterNames, + HeuristicConfigurationData heuristicConfData) { + this._counterNames = counterNames; this._heuristicConfData = heuristicConfData; loadParameters(); @@ -114,7 +114,11 @@ public HeuristicResult apply(MapReduceApplicationData data) { for (int i = 0; i < tasks.length; i++) { if (tasks[i].isCounterDataPresent()) { - inputBytes.add(tasks[i].getCounters().get(_counterName)); + long inputByte = 0; + for (MapReduceCounterData.CounterName counterName: _counterNames) { + inputByte += tasks[i].getCounters().get(counterName); + } + inputBytes.add(inputByte); } } diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristic.java index e598b6807..6709e0b46 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristic.java @@ -21,6 +21,8 @@ import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData; import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData; +import java.util.Arrays; + /** * This Heuristic analyses the skewness in the mapper input data @@ -28,7 +30,12 @@ public class MapperDataSkewHeuristic extends GenericDataSkewHeuristic { public MapperDataSkewHeuristic(HeuristicConfigurationData heuristicConfData) { - super(MapReduceCounterData.CounterName.HDFS_BYTES_READ, heuristicConfData); + super(Arrays.asList( + MapReduceCounterData.CounterName.HDFS_BYTES_READ, + MapReduceCounterData.CounterName.S3_BYTES_READ, + MapReduceCounterData.CounterName.S3A_BYTES_READ, + MapReduceCounterData.CounterName.S3N_BYTES_READ + ), heuristicConfData); } @Override diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristic.java index a80eb551d..ac67ae9a2 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristic.java @@ -48,6 +48,13 @@ public class MapperSpeedHeuristic implements Heuristic private double[] diskSpeedLimits = {1d/2, 1d/4, 1d/8, 1d/32}; // Fraction of HDFS block size private double[] runtimeLimits = {5, 10, 15, 30}; // The Map task runtime in milli sec + private List _counterNames = Arrays.asList( + MapReduceCounterData.CounterName.HDFS_BYTES_READ, + MapReduceCounterData.CounterName.S3_BYTES_READ, + MapReduceCounterData.CounterName.S3A_BYTES_READ, + MapReduceCounterData.CounterName.S3N_BYTES_READ + ); + private HeuristicConfigurationData _heuristicConfData; private void loadParameters() { @@ -101,7 +108,10 @@ public HeuristicResult apply(MapReduceApplicationData data) { for (MapReduceTaskData task : tasks) { if (task.isTimeAndCounterDataPresent()) { - long inputBytes = task.getCounters().get(MapReduceCounterData.CounterName.HDFS_BYTES_READ); + long inputBytes = 0; + for (MapReduceCounterData.CounterName counterName: _counterNames) { + inputBytes += task.getCounters().get(counterName); + } long runtimeMs = task.getTotalRunTimeMs(); inputByteSizes.add(inputBytes); runtimesMs.add(runtimeMs); diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristic.java index f47401c29..d8d4e6cb7 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristic.java @@ -48,6 +48,13 @@ public class MapperTimeHeuristic implements Heuristic private double[] longRuntimeLimits = {15, 30, 60, 120}; // Limits(ms) for tasks with longer runtime private double[] numTasksLimits = {50, 101, 500, 1000}; // Number of Map tasks. + private List _counterNames = Arrays.asList( + MapReduceCounterData.CounterName.HDFS_BYTES_READ, + MapReduceCounterData.CounterName.S3_BYTES_READ, + MapReduceCounterData.CounterName.S3A_BYTES_READ, + MapReduceCounterData.CounterName.S3N_BYTES_READ + ); + private HeuristicConfigurationData _heuristicConfData; private void loadParameters() { @@ -109,7 +116,11 @@ public HeuristicResult apply(MapReduceApplicationData data) { for (MapReduceTaskData task : tasks) { if (task.isTimeAndCounterDataPresent()) { - inputBytes.add(task.getCounters().get(MapReduceCounterData.CounterName.HDFS_BYTES_READ)); + long inputByte = 0; + for (MapReduceCounterData.CounterName counterName: _counterNames) { + inputByte += task.getCounters().get(counterName); + } + inputBytes.add(inputByte); long taskTime = task.getTotalRunTimeMs(); runtimesMs.add(taskTime); taskMinMs = Math.min(taskMinMs, taskTime); diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerDataSkewHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerDataSkewHeuristic.java index 19d21bd70..c01547623 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerDataSkewHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerDataSkewHeuristic.java @@ -21,6 +21,9 @@ import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData; import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData; +import java.util.Arrays; + + /** * This Heuristic analyses the skewness in the reducer input data @@ -28,7 +31,7 @@ public class ReducerDataSkewHeuristic extends GenericDataSkewHeuristic { public ReducerDataSkewHeuristic(HeuristicConfigurationData heuristicConfData) { - super(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES, heuristicConfData); + super(Arrays.asList(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES), heuristicConfData); } @Override diff --git a/test/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristicTest.java b/test/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristicTest.java index 24c4e96cc..99557466d 100644 --- a/test/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristicTest.java +++ b/test/com/linkedin/drelephant/mapreduce/heuristics/MapperDataSkewHeuristicTest.java @@ -78,7 +78,7 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu smallCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, smallInputSize); MapReduceCounterData largeCounter = new MapReduceCounterData(); - largeCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, largeInputSize); + largeCounter.set(MapReduceCounterData.CounterName.S3A_BYTES_READ, largeInputSize); int i = 0; for (; i < numSmallTasks; i++) { diff --git a/test/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristicTest.java b/test/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristicTest.java index 379b83fd1..a3246e297 100644 --- a/test/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristicTest.java +++ b/test/com/linkedin/drelephant/mapreduce/heuristics/MapperSpeedHeuristicTest.java @@ -85,7 +85,10 @@ private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException { MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1]; MapReduceCounterData counter = new MapReduceCounterData(); - counter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, readBytes); + counter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, readBytes / 4); + counter.set(MapReduceCounterData.CounterName.S3_BYTES_READ, readBytes / 4); + counter.set(MapReduceCounterData.CounterName.S3A_BYTES_READ, readBytes / 4); + counter.set(MapReduceCounterData.CounterName.S3N_BYTES_READ, readBytes / 4); int i = 0; for (; i < NUMTASKS; i++) { diff --git a/test/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristicTest.java b/test/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristicTest.java index 8c1d05110..8ba353f41 100644 --- a/test/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristicTest.java +++ b/test/com/linkedin/drelephant/mapreduce/heuristics/MapperTimeHeuristicTest.java @@ -91,7 +91,10 @@ private Severity analyzeJob(int numTasks, long runtime) throws IOException { MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks + 1]; MapReduceCounterData taskCounter = new MapReduceCounterData(); - taskCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, DUMMY_INPUT_SIZE); + taskCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, DUMMY_INPUT_SIZE / 4); + taskCounter.set(MapReduceCounterData.CounterName.S3_BYTES_READ, DUMMY_INPUT_SIZE / 4); + taskCounter.set(MapReduceCounterData.CounterName.S3A_BYTES_READ, DUMMY_INPUT_SIZE / 4); + taskCounter.set(MapReduceCounterData.CounterName.S3N_BYTES_READ, DUMMY_INPUT_SIZE / 4); int i = 0; for (; i < numTasks; i++) {