From 726f01939e64ea5eb1d8b6b17d4cdf2948d32e5b Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 22 Mar 2017 19:59:38 -0700 Subject: [PATCH 1/3] [SPARK-18364][YARN] Expose metrics for YarnShuffleService Registers the shuffle server's metrics with the Hadoop Node Manager's DefaultMetricsSystem. --- .../network/yarn/YarnShuffleService.java | 52 +++++--- .../yarn/YarnShuffleServiceMetrics.java | 115 ++++++++++++++++++ 2 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c7620d0fe1288..1cb63104d52c0 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -17,14 +17,6 @@ package org.apache.spark.network.yarn; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.List; -import java.util.Map; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,23 +27,35 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; -import org.apache.spark.network.util.LevelDBProvider; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.spark.network.TransportContext; import org.apache.spark.network.crypto.AuthServerBootstrap; import org.apache.spark.network.sasl.ShuffleSecretManager; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; /** * An external shuffle service used by Spark on Yarn. @@ -110,7 +114,6 @@ public class YarnShuffleService extends AuxiliaryService { // The actual server that serves shuffle files private TransportServer shuffleServer = null; - private Configuration _conf = null; // The recovery path used to shuffle service recovery @@ -166,6 +169,23 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + // register metrics on the block handler into the Node Manager's metrics system. + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + MetricsSystem defaultMetricsSystem = DefaultMetricsSystem.instance(); + try { + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) defaultMetricsSystem; + + Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", + String.class, String.class, MetricsSource.class); + registerSourceMethod.setAccessible(true); + registerSourceMethod.invoke(metricsSystem, "shuffleservice", "Metrics on the Spark " + + "Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + } catch (Exception e) { + logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + + "proceeding without metrics", e); + } + // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000000..922ca06546bf6 --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import com.codahale.metrics.*; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +import java.util.Map; + +/** + * Modeled off of YARN's NodeManagerMetrics. + */ +public class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + String name = entry.getKey(); + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (entry.getValue() instanceof Timer) { + Timer t = (Timer) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()) + ; + } else if (entry.getValue() instanceof Meter) { + Meter m = (Meter) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()) + ; + } else if (entry.getValue() instanceof Gauge) { + Gauge m = (Gauge) entry.getValue(); + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } + } + } + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +} From e7daa76cf1db2f1f807ba1b1e3d907472700baf2 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 23 Mar 2017 13:05:12 -0700 Subject: [PATCH 2/3] Pull up semicolons --- .../spark/network/yarn/YarnShuffleServiceMetrics.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 922ca06546bf6..30c9712d806fb 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -63,8 +63,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), t.getOneMinuteRate()) .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), - t.getMeanRate()) - ; + t.getMeanRate()); } else if (entry.getValue() instanceof Meter) { Meter m = (Meter) entry.getValue(); metricsRecordBuilder @@ -77,8 +76,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), m.getOneMinuteRate()) .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), - m.getMeanRate()) - ; + m.getMeanRate()); } else if (entry.getValue() instanceof Gauge) { Gauge m = (Gauge) entry.getValue(); Object gaugeValue = m.getValue(); From f82bc065a1ccaff17938d7f20db6945853396625 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 23 Mar 2017 13:05:18 -0700 Subject: [PATCH 3/3] Fix up imports --- .../network/yarn/YarnShuffleService.java | 28 ++++++++++--------- .../yarn/YarnShuffleServiceMetrics.java | 7 +++-- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 1cb63104d52c0..e8ca422c49855 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -17,6 +17,15 @@ package org.apache.spark.network.yarn; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -31,9 +40,14 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.spark.network.TransportContext; import org.apache.spark.network.crypto.AuthServerBootstrap; import org.apache.spark.network.sasl.ShuffleSecretManager; @@ -43,19 +57,7 @@ import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.List; -import java.util.Map; /** * An external shuffle service used by Spark on Yarn. diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 30c9712d806fb..4c104b47357bf 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -17,12 +17,15 @@ package org.apache.spark.network.yarn; -import com.codahale.metrics.*; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import java.util.Map;