Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dr. Elephant Tez Support working patch #313

Merged
merged 6 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app-conf/AggregatorConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.MapReduceMetricsAggregator</classname>
</aggregator>
<aggregator>
<applicationtype>tez</applicationtype>
<classname>com.linkedin.drelephant.tez.TezMetricsAggregator</classname>
</aggregator>
<aggregator>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.SparkMetricsAggregator</classname>
Expand Down
7 changes: 7 additions & 0 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
</fetcher>
-->
<fetchers>
<!--
REST based fetcher for Tez jobs which pulls job metrics and data from Timeline Server API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this dependency in the docs and setup instructions.

-->
<fetcher>
<applicationtype>tez</applicationtype>
<classname>com.linkedin.drelephant.tez.fetchers.TezFetcher</classname>
</fetcher>
<!--
<fetcher>
<applicationtype>mapreduce</applicationtype>
Expand Down
117 changes: 117 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,123 @@
<!-- Heuristics configurations, each heuristic will be loaded by a particular analyser -->
<heuristics>

<!-- TEZ HEURISTICS -->

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Data Skew</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperDataSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<num_tasks_severity>10, 50, 100, 200</num_tasks_severity>
<deviation_severity>2, 4, 8, 16</deviation_severity>
<files_severity>1/8, 1/4, 1/2, 1</files_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper GC</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperGCHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<exclude_jobtypes_filter>OozieLauncher</exclude_jobtypes_filter>
<gc_ratio_severity>0.01, 0.02, 0.03, 0.04</gc_ratio_severity>
<runtime_severity_in_min>5, 10, 12, 15</runtime_severity_in_min>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Time</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperTimeHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<exclude_jobtypes_filter>OozieLauncher</exclude_jobtypes_filter>
<short_runtime_severity_in_min>10, 4, 2, 1</short_runtime_severity_in_min>
<long_runtime_severity_in_min>15, 30, 60, 120</long_runtime_severity_in_min>
<num_tasks_severity>50, 101, 500, 1000</num_tasks_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Speed</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperSpeedHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<exclude_jobtypes_filter>OozieLauncher</exclude_jobtypes_filter>
<disk_speed_severity>1/2, 1/4, 1/8, 1/32</disk_speed_severity>
<runtime_severity_in_min>5, 10, 15, 30</runtime_severity_in_min>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Memory</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperMemoryHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<memory_ratio_severity>0.6, 0.5, 0.4, 0.3</memory_ratio_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Spill</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperSpillHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<spill_severity>2.01, 2.2, 2.5, 3</spill_severity>
<num_tasks_severity>50, 100, 500, 1000</num_tasks_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer Data Skew</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerDataSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<num_tasks_severity>10, 50, 100, 200</num_tasks_severity>
<deviation_severity>2, 4, 8, 16</deviation_severity>
<files_severity>1/8, 1/4, 1/2, 1</files_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer GC</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerGCHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<gc_ratio_severity>0.01, 0.02, 0.03, 0.04</gc_ratio_severity>
<runtime_severity_in_min>5, 10, 12, 15</runtime_severity_in_min>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer Time</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerTimeHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<short_runtime_severity_in_min>10, 4, 2, 1</short_runtime_severity_in_min>
<long_runtime_severity_in_min>15, 30, 60, 120</long_runtime_severity_in_min>
<num_tasks_severity>50, 101, 500, 1000</num_tasks_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer Memory</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerMemoryHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<memory_ratio_severity>0.6, 0.5, 0.4, 0.3</memory_ratio_severity>
</params>-->
</heuristic>

<!-- MAP-REDUCE HEURISTICS -->

<heuristic>
Expand Down
6 changes: 6 additions & 0 deletions app-conf/JobTypeConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<applicationtype>mapreduce</applicationtype>
<conf>pig.script</conf>
</jobType>
<jobType>
<name>Tez</name>
<applicationtype>tez</applicationtype>
<conf>hive.mapred.mode</conf>
<isDefault/>
</jobType>
<jobType>
<name>Hive</name>
<applicationtype>mapreduce</applicationtype>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,23 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceCounterData;
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.math.Statistics;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData;
import com.linkedin.drelephant.util.ThreadContextMR2;
import com.linkedin.drelephant.util.Utils;

import java.io.IOException;
import java.lang.Integer;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;


/**
Expand Down Expand Up @@ -387,78 +380,3 @@ private JsonNode getTaskFirstFailedAttempt(URL taskAllAttemptsUrl) throws IOExce
}
}
}

final class ThreadContextMR2 {
private static final Logger logger = Logger.getLogger(ThreadContextMR2.class);
private static final AtomicInteger THREAD_ID = new AtomicInteger(1);

private static final ThreadLocal<Integer> _LOCAL_THREAD_ID = new ThreadLocal<Integer>() {
@Override
public Integer initialValue() {
return THREAD_ID.getAndIncrement();
}
};

private static final ThreadLocal<Long> _LOCAL_LAST_UPDATED = new ThreadLocal<Long>();
private static final ThreadLocal<Long> _LOCAL_UPDATE_INTERVAL = new ThreadLocal<Long>();

private static final ThreadLocal<Pattern> _LOCAL_DIAGNOSTIC_PATTERN = new ThreadLocal<Pattern>() {
@Override
public Pattern initialValue() {
// Example: "Task task_1443068695259_9143_m_000475 failed 1 times"
return Pattern.compile(
".*[\\s\\u00A0]+(task_[0-9]+_[0-9]+_[m|r]_[0-9]+)[\\s\\u00A0]+.*");
}
};

private static final ThreadLocal<AuthenticatedURL.Token> _LOCAL_AUTH_TOKEN =
new ThreadLocal<AuthenticatedURL.Token>() {
@Override
public AuthenticatedURL.Token initialValue() {
_LOCAL_LAST_UPDATED.set(System.currentTimeMillis());
// Random an interval for each executor to avoid update token at the same time
_LOCAL_UPDATE_INTERVAL.set(Statistics.MINUTE_IN_MS * 30 + new Random().nextLong()
% (3 * Statistics.MINUTE_IN_MS));
logger.info("Executor " + _LOCAL_THREAD_ID.get() + " update interval " + _LOCAL_UPDATE_INTERVAL.get() * 1.0
/ Statistics.MINUTE_IN_MS);
return new AuthenticatedURL.Token();
}
};

private static final ThreadLocal<AuthenticatedURL> _LOCAL_AUTH_URL = new ThreadLocal<AuthenticatedURL>() {
@Override
public AuthenticatedURL initialValue() {
return new AuthenticatedURL();
}
};

private static final ThreadLocal<ObjectMapper> _LOCAL_MAPPER = new ThreadLocal<ObjectMapper>() {
@Override
public ObjectMapper initialValue() {
return new ObjectMapper();
}
};

private ThreadContextMR2() {
// Empty on purpose
}

public static Matcher getDiagnosticMatcher(String diagnosticInfo) {
return _LOCAL_DIAGNOSTIC_PATTERN.get().matcher(diagnosticInfo);
}

public static JsonNode readJsonNode(URL url) throws IOException, AuthenticationException {
HttpURLConnection conn = _LOCAL_AUTH_URL.get().openConnection(url, _LOCAL_AUTH_TOKEN.get());
return _LOCAL_MAPPER.get().readTree(conn.getInputStream());
}

public static void updateAuthToken() {
long curTime = System.currentTimeMillis();
if (curTime - _LOCAL_LAST_UPDATED.get() > _LOCAL_UPDATE_INTERVAL.get()) {
logger.info("Executor " + _LOCAL_THREAD_ID.get() + " updates its AuthenticatedToken.");
_LOCAL_AUTH_TOKEN.set(new AuthenticatedURL.Token());
_LOCAL_AUTH_URL.set(new AuthenticatedURL());
_LOCAL_LAST_UPDATED.set(curTime);
}
}
}
109 changes: 109 additions & 0 deletions app/com/linkedin/drelephant/tez/TezMetricsAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.linkedin.drelephant.tez;

import com.linkedin.drelephant.analysis.*;
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData;
import com.linkedin.drelephant.tez.data.TezApplicationData;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

/**
* Aggregates task level metrics to application
*/

public class TezMetricsAggregator implements HadoopMetricsAggregator {

private static final Logger logger = Logger.getLogger(TezMetricsAggregator.class);

private static final String TEZ_CONTAINER_CONFIG = "hive.tez.container.size";
private static final String MAP_CONTAINER_CONFIG = "mapreduce.map.memory.mb";
private static final String REDUCER_CONTAINER_CONFIG = "mapreduce.reduce.memory.mb";
private static final String REDUCER_SLOW_START_CONFIG = "mapreduce.job.reduce.slowstart.completedmaps";
private static final long CONTAINER_MEMORY_DEFAULT_BYTES = 2048L * FileUtils.ONE_MB;

private HadoopAggregatedData _hadoopAggregatedData = null;
private TezTaskLevelAggregatedMetrics _mapTasks;
private TezTaskLevelAggregatedMetrics _reduceTasks;

private AggregatorConfigurationData _aggregatorConfigurationData;

public TezMetricsAggregator(AggregatorConfigurationData _aggregatorConfigurationData) {
this._aggregatorConfigurationData = _aggregatorConfigurationData;
_hadoopAggregatedData = new HadoopAggregatedData();
}

@Override
public void aggregate(HadoopApplicationData hadoopData) {

TezApplicationData data = (TezApplicationData) hadoopData;

long mapTaskContainerSize = getMapContainerSize(data);
long reduceTaskContainerSize = getReducerContainerSize(data);

int reduceTaskSlowStartPercentage =
(int) (Double.parseDouble(data.getConf().getProperty(REDUCER_SLOW_START_CONFIG)) * 100);


//overwrite reduceTaskSlowStartPercentage to 100%. TODO: make use of the slow start percent
reduceTaskSlowStartPercentage = 100;

_mapTasks = new TezTaskLevelAggregatedMetrics(data.getMapTaskData(), mapTaskContainerSize, data.getStartTime());

long reduceIdealStartTime = _mapTasks.getNthPercentileFinishTime(reduceTaskSlowStartPercentage);

// Mappers list is empty
if(reduceIdealStartTime == -1) {
// ideal start time for reducer is infinite since it cannot start
reduceIdealStartTime = Long.MAX_VALUE;
}

_reduceTasks = new TezTaskLevelAggregatedMetrics(data.getReduceTaskData(), reduceTaskContainerSize, reduceIdealStartTime);

_hadoopAggregatedData.setResourceUsed(_mapTasks.getResourceUsed() + _reduceTasks.getResourceUsed());
_hadoopAggregatedData.setTotalDelay(_mapTasks.getDelay() + _reduceTasks.getDelay());
_hadoopAggregatedData.setResourceWasted(_mapTasks.getResourceWasted() + _reduceTasks.getResourceWasted());
}

@Override
public HadoopAggregatedData getResult() {
return _hadoopAggregatedData;
}

private long getMapContainerSize(HadoopApplicationData data) {
try {
long mapContainerSize = Long.parseLong(data.getConf().getProperty(TEZ_CONTAINER_CONFIG));
if (mapContainerSize > 0)
return mapContainerSize;
else
return Long.parseLong(data.getConf().getProperty(MAP_CONTAINER_CONFIG));
} catch ( NumberFormatException ex) {
return CONTAINER_MEMORY_DEFAULT_BYTES;
}
}

private long getReducerContainerSize(HadoopApplicationData data) {
try {
long reducerContainerSize = Long.parseLong(data.getConf().getProperty(TEZ_CONTAINER_CONFIG));
if (reducerContainerSize > 0)
return reducerContainerSize;
else
return Long.parseLong(data.getConf().getProperty(REDUCER_CONTAINER_CONFIG));
} catch ( NumberFormatException ex) {
return CONTAINER_MEMORY_DEFAULT_BYTES;
}
}
}
Loading