From 04d66d12b059e30be40b75a660b955c52e739758 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 21 Dec 2018 11:28:22 -0800 Subject: [PATCH] [SPARK-25642][YARN] Adding two new metrics to record the number of registered connections as well as the number of active connections to YARN Shuffle Service Recently, the ability to expose the metrics for YARN Shuffle Service was added as part of [SPARK-18364](https://github.com/apache/spark/pull/22485). We need to add some metrics to be able to determine the number of active connections as well as open connections to the external shuffle service to benchmark network and connection issues on large cluster environments. Added two more shuffle server metrics for Spark Yarn shuffle service: numRegisteredConnections which indicate the number of registered connections to the shuffle service and numActiveConnections which indicate the number of active connections to the shuffle service at any given point in time. If these metrics are outputted to a file, we get something like this: 1533674653489 default.shuffleService: Hostname=server1.abc.com, openBlockRequestLatencyMillis_count=729, openBlockRequestLatencyMillis_rate15=0.7110833548897356, openBlockRequestLatencyMillis_rate5=1.657808981793011, openBlockRequestLatencyMillis_rate1=2.2404486061620474, openBlockRequestLatencyMillis_rateMean=0.9242558551196706, numRegisteredConnections=35, blockTransferRateBytes_count=2635880512, blockTransferRateBytes_rate15=2578547.6094160094, blockTransferRateBytes_rate5=6048721.726302424, blockTransferRateBytes_rate1=8548922.518223226, blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, registerExecutorRequestLatencyMillis_count=5, registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, registerExecutorRequestLatencyMillis_rate1=2.8270296777387467E-6, registerExecutorRequestLatencyMillis_rateMean=0.006339206380043053, numActiveConnections=35 Closes #22498 from pgandhi999/SPARK-18364. Authored-by: pgandhi Signed-off-by: Marcelo Vanzin (cherry picked from commit 8dd29fe36b781d115213b1d6a8446ad04e9239bb) --- .../spark/network/TransportContext.java | 9 ++++++- .../server/TransportChannelHandler.java | 18 +++++++++++++- .../spark/network/server/TransportServer.java | 5 ++++ .../shuffle/ExternalShuffleBlockHandler.java | 24 +++++++++++++++++-- .../network/yarn/YarnShuffleService.java | 21 +++++++++------- .../yarn/YarnShuffleServiceMetrics.java | 5 ++++ .../spark/deploy/ExternalShuffleService.scala | 2 ++ .../yarn/YarnShuffleServiceMetricsSuite.scala | 3 ++- 8 files changed, 73 insertions(+), 14 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 480b52652de53..1a3f3f2a6f249 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import com.codahale.metrics.Counter; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; @@ -66,6 +67,8 @@ public class TransportContext { private final RpcHandler rpcHandler; private final boolean closeIdleConnections; private final boolean isClientOnly; + // Number of registered connections to the shuffle service + private Counter registeredConnections = new Counter(); /** * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created @@ -221,7 +224,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred()); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs(), closeIdleConnections); + conf.connectionTimeoutMs(), closeIdleConnections, this); } /** @@ -234,4 +237,8 @@ private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler } public TransportConf getConf() { return conf; } + + public Counter getRegisteredConnections() { + return registeredConnections; + } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index c824a7b0d4740..ca81099c4d5cb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.spark.network.TransportContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,18 +58,21 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler allMetrics; // Time latency for open block request in ms private final Timer openBlockRequestLatencyMillis = new Timer(); @@ -181,14 +183,20 @@ private class ShuffleMetrics implements MetricSet { private final Timer registerExecutorRequestLatencyMillis = new Timer(); // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); + // Number of active connections to the shuffle service + private Counter activeConnections = new Counter(); + // Number of registered connections to the shuffle service + private Counter registeredConnections = new Counter(); - private ShuffleMetrics() { + public ShuffleMetrics() { allMetrics = new HashMap<>(); allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); allMetrics.put("registeredExecutorsSize", (Gauge) () -> blockManager.getRegisteredExecutorsSize()); + allMetrics.put("numActiveConnections", activeConnections); + allMetrics.put("numRegisteredConnections", registeredConnections); } @Override @@ -244,4 +252,16 @@ public ManagedBuffer next() { } } + @Override + public void channelActive(TransportClient client) { + metrics.activeConnections.inc(); + super.channelActive(client); + } + + @Override + public void channelInactive(TransportClient client) { + metrics.activeConnections.dec(); + super.channelInactive(client); + } + } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 72ae1a1295236..7e8d3b2bc3ba4 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -170,15 +170,6 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - // register metrics on the block handler into the Node Manager's metrics system. - YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); - - MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); - metricsSystem.register( - "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); - logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); - // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); @@ -199,6 +190,18 @@ protected void serviceInit(Configuration conf) throws Exception { port = shuffleServer.getPort(); boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; + + // register metrics on the block handler into the Node Manager's metrics system. + blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", + shuffleServer.getRegisteredConnections()); + YarnShuffleServiceMetrics serviceMetrics = + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + metricsSystem.register( + "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 3e4d479b862b3..501237407e9b2 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -107,6 +107,11 @@ public static void collectMetric( throw new IllegalStateException( "Not supported class type of metric[" + name + "] for value " + gaugeValue); } + } else if (metric instanceof Counter) { + Counter c = (Counter) metric; + long counterValue = c.getCount(); + metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " + + "connections to shuffle service " + name), counterValue); } } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index f6b3c37f0fe72..03e3abb3ce569 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana server = transportContext.createServer(port, bootstraps.asJava) shuffleServiceSource.registerMetricSet(server.getAllMetrics) + blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections", + server.getRegisteredConnections) shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics) masterMetricsSystem.registerSource(shuffleServiceSource) masterMetricsSystem.start() diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index 40b92282a3b8f..952fd0b70bb7b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -38,7 +38,8 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { test("metrics named as expected") { val allMetrics = Set( "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", - "blockTransferRateBytes", "registeredExecutorsSize") + "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", + "numRegisteredConnections") metrics.getMetrics.keySet().asScala should be (allMetrics) }