diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c7620d0fe1288..e8ca422c49855 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,8 +19,9 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -35,9 +36,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; -import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -50,9 +54,11 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; + /** * An external shuffle service used by Spark on Yarn. * @@ -110,7 +116,6 @@ public class YarnShuffleService extends AuxiliaryService { // The actual server that serves shuffle files private TransportServer shuffleServer = null; - private Configuration _conf = null; // The recovery path used to shuffle service recovery @@ -166,6 +171,23 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + // register metrics on the block handler into the Node Manager's metrics system. + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + MetricsSystem defaultMetricsSystem = DefaultMetricsSystem.instance(); + try { + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) defaultMetricsSystem; + + Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", + String.class, String.class, MetricsSource.class); + registerSourceMethod.setAccessible(true); + registerSourceMethod.invoke(metricsSystem, "shuffleservice", "Metrics on the Spark " + + "Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + } catch (Exception e) { + logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + + "proceeding without metrics", e); + } + // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000000..4c104b47357bf --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; + +import java.util.Map; + +/** + * Modeled off of YARN's NodeManagerMetrics. + */ +public class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + String name = entry.getKey(); + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (entry.getValue() instanceof Timer) { + Timer t = (Timer) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (entry.getValue() instanceof Meter) { + Meter m = (Meter) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (entry.getValue() instanceof Gauge) { + Gauge m = (Gauge) entry.getValue(); + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } + } + } + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +}