-
Notifications
You must be signed in to change notification settings - Fork 28.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-3797] Run external shuffle service in Yarn NM
This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark. This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster. Author: Andrew Or <andrew@databricks.com> Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits: ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 0ee67a2 [Andrew Or] Minor wording suggestions 1c66046 [Andrew Or] Remove unused provided dependencies 0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 6489db5 [Andrew Or] Try catch at the right places 7b71d8f [Andrew Or] Add detailed java docs + reword a few comments d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE) 5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 9b6e058 [Andrew Or] Address various feedback f48b20c [Andrew Or] Fix tests again f39daa6 [Andrew Or] Do not make network-yarn an assembly module 761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 15a5b37 [Andrew Or] Fix build for Hadoop 1.x baff916 [Andrew Or] Fix tests 5bf9b7e [Andrew Or] Address a few minor comments 5b419b8 [Andrew Or] Add missing license header 804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled 1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config b4b1f0c [Andrew Or] 4 tabs -> 2 tabs 43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service
- Loading branch information
Andrew Or
committed
Nov 5, 2014
1 parent
f37817b
commit 61a5cce
Showing
12 changed files
with
483 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
117 changes: 117 additions & 0 deletions
117
network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.network.sasl; | ||
|
||
import java.lang.Override; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.Charset; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.spark.network.sasl.SecretKeyHolder; | ||
|
||
/** | ||
* A class that manages shuffle secret used by the external shuffle service. | ||
*/ | ||
public class ShuffleSecretManager implements SecretKeyHolder { | ||
private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class); | ||
private final ConcurrentHashMap<String, String> shuffleSecretMap; | ||
|
||
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); | ||
|
||
// Spark user used for authenticating SASL connections | ||
// Note that this must match the value in org.apache.spark.SecurityManager | ||
private static final String SPARK_SASL_USER = "sparkSaslUser"; | ||
|
||
/** | ||
* Convert the given string to a byte buffer. The resulting buffer can be converted back to | ||
* the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external | ||
* shuffle service represents shuffle secrets as bytes buffers instead of strings. | ||
*/ | ||
public static ByteBuffer stringToBytes(String s) { | ||
return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); | ||
} | ||
|
||
/** | ||
* Convert the given byte buffer to a string. The resulting string can be converted back to | ||
* the same byte buffer through {@link #stringToBytes(String)}. This is used if the external | ||
* shuffle service represents shuffle secrets as bytes buffers instead of strings. | ||
*/ | ||
public static String bytesToString(ByteBuffer b) { | ||
return new String(b.array(), UTF8_CHARSET); | ||
} | ||
|
||
public ShuffleSecretManager() { | ||
shuffleSecretMap = new ConcurrentHashMap<String, String>(); | ||
} | ||
|
||
/** | ||
* Register an application with its secret. | ||
* Executors need to first authenticate themselves with the same secret before | ||
* fetching shuffle files written by other executors in this application. | ||
*/ | ||
public void registerApp(String appId, String shuffleSecret) { | ||
if (!shuffleSecretMap.contains(appId)) { | ||
shuffleSecretMap.put(appId, shuffleSecret); | ||
logger.info("Registered shuffle secret for application {}", appId); | ||
} else { | ||
logger.debug("Application {} already registered", appId); | ||
} | ||
} | ||
|
||
/** | ||
* Register an application with its secret specified as a byte buffer. | ||
*/ | ||
public void registerApp(String appId, ByteBuffer shuffleSecret) { | ||
registerApp(appId, bytesToString(shuffleSecret)); | ||
} | ||
|
||
/** | ||
* Unregister an application along with its secret. | ||
* This is called when the application terminates. | ||
*/ | ||
public void unregisterApp(String appId) { | ||
if (shuffleSecretMap.contains(appId)) { | ||
shuffleSecretMap.remove(appId); | ||
logger.info("Unregistered shuffle secret for application {}", appId); | ||
} else { | ||
logger.warn("Attempted to unregister application {} when it is not registered", appId); | ||
} | ||
} | ||
|
||
/** | ||
* Return the Spark user for authenticating SASL connections. | ||
*/ | ||
@Override | ||
public String getSaslUser(String appId) { | ||
return SPARK_SASL_USER; | ||
} | ||
|
||
/** | ||
* Return the secret key registered with the given application. | ||
* This key is used to authenticate the executors before they can fetch shuffle files | ||
* written by this application from the external shuffle service. If the specified | ||
* application is not registered, return null. | ||
*/ | ||
@Override | ||
public String getSecretKey(String appId) { | ||
return shuffleSecretMap.get(appId); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent</artifactId> | ||
<version>1.2.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-network-yarn_2.10</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Spark Project Yarn Shuffle Service Code</name> | ||
<url>http://spark.apache.org/</url> | ||
<properties> | ||
<sbt.project.name>network-yarn</sbt.project.name> | ||
</properties> | ||
|
||
<dependencies> | ||
<!-- Core dependencies --> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-network-shuffle_2.10</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<!-- Provided dependencies --> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-client</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
</build> | ||
</project> |
Oops, something went wrong.
61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I compile with command "make-distribution.sh --tgz -Phadoop-2.3 -Phive -Phive-0.13.1 -Pyarn -Dyarn.version=2.3.0-cdh5.1.2 -Dhadoop.version=2.3.0-cdh5.1.2" , and error occur:
cp: can not stat “/root/spark-master/network/yarn/target/scala_/spark-network-yarn_.jar”: can not find this file or dir
61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @zzcclp yes there will be a follow-up PR that properly packages these jars into one. Thanks for pointing this out.
61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed this at https://issues.apache.org/jira/browse/SPARK-4281
61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @zzcclp I just pushed a hot fix. I didn't realize
make-distribution.sh
fails fast if the cp fails.61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tks for your reply. When this hot fix can be solved, approximate time?
61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already merged into master.
61a5cce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, Tks.