Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service #40843

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.base.Preconditions;

/**
* Stores the applications which have recovery disabled.
*/
public final class AppsWithRecoveryDisabled {

private static final AppsWithRecoveryDisabled INSTANCE = new AppsWithRecoveryDisabled();

private final Set<String> appsWithRecoveryDisabled = Collections.newSetFromMap(
new ConcurrentHashMap<>());

private AppsWithRecoveryDisabled() {
}

/**
* Add an application for which recovery is disabled.
* @param appId application id
*/
public static void disableRecoveryOfApp(String appId) {
Preconditions.checkNotNull(appId);
INSTANCE.appsWithRecoveryDisabled.add(appId);
}

/**
* Returns whether an application is enabled for recovery or not.
* @param appId application id
* @return true if the application is enabled for recovery; false otherwise.
*/
public static boolean isRecoveryEnabledForApp(String appId) {
Preconditions.checkNotNull(appId);
return !INSTANCE.appsWithRecoveryDisabled.contains(appId);
}

/**
* Removes the application from the store.
* @param appId application id
*/
public static void removeApp(String appId) {
Preconditions.checkNotNull(appId);
INSTANCE.appsWithRecoveryDisabled.remove(appId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void registerExecutor(
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
try {
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
byte[] key = dbAppExecKey(fullId);
byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
Expand Down Expand Up @@ -224,7 +224,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
// Only touch executors associated with the appId that was removed.
if (appId.equals(fullId.appId)) {
it.remove();
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(fullId.appId)) {
try {
db.delete(dbAppExecKey(fullId));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ void closeAndDeletePartitionsIfNeeded(
@VisibleForTesting
void removeAppAttemptPathInfoFromDB(String appId, int attemptId) {
AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
try {
byte[] key = getDbAppAttemptPathsKey(appAttemptId);
db.delete(key);
Expand Down Expand Up @@ -967,7 +967,7 @@ private void shutdownMergedShuffleCleanerNow() {
* Write the application attempt's local path information to the DB
*/
private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) {
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
try {
byte[] key = getDbAppAttemptPathsKey(appAttemptId);
Expand All @@ -985,7 +985,8 @@ private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo app
*/
private void writeAppAttemptShuffleMergeInfoToDB(
AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(
appAttemptShuffleMergeId.appId)) {
// Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
try{
byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.AppsWithRecoveryDisabled;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
Expand Down Expand Up @@ -136,6 +138,12 @@ public class YarnShuffleService extends AuxiliaryService {

private static final boolean DEFAULT_STOP_ON_FAILURE = false;

@VisibleForTesting
static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =
"spark.yarn.shuffle.server.recovery.disabled";
@VisibleForTesting
static final String SECRET_KEY = "secret";

// just for testing when you want to find an open port
@VisibleForTesting
static int boundPort = -1;
Expand Down Expand Up @@ -407,10 +415,30 @@ private static byte[] dbAppKey(AppId appExecId) throws IOException {
public void initializeApplication(ApplicationInitializationContext context) {
String appId = context.getApplicationId().toString();
try {
ByteBuffer shuffleSecret = context.getApplicationDataForService();
ByteBuffer appServiceData = context.getApplicationDataForService();
String payload = JavaUtils.bytesToString(appServiceData);
String shuffleSecret;
Map<String, Object> metaInfo;
try {
metaInfo = mapper.readValue(payload,
new TypeReference<Map<String, Object>>() {});
Object metadataStorageVal = metaInfo.get(SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED);
if (metadataStorageVal != null && (Boolean) metadataStorageVal) {
AppsWithRecoveryDisabled.disableRecoveryOfApp(appId);
logger.info("Disabling metadata persistence for application {}", appId);
}
} catch (IOException ioe) {
logger.warn("Unable to parse application data for service: " + payload);
metaInfo = null;
}
if (isAuthenticationEnabled()) {
AppId fullId = new AppId(appId);
if (db != null) {
if (metaInfo != null) {
shuffleSecret = (String) metaInfo.get(SECRET_KEY);
} else {
shuffleSecret = payload;
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
}
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
AppId fullId = new AppId(appId);
byte[] key = dbAppKey(fullId);
byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test with shuffleSecret = null and appServiceData is a json ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed the test that I added executor info of apps should not be stored in db if they want to be excluded. Authentication is turned off to test this case.

db.put(key, value);
Expand All @@ -428,7 +456,7 @@ public void stopApplication(ApplicationTerminationContext context) {
try {
if (isAuthenticationEnabled()) {
AppId fullId = new AppId(appId);
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
try {
db.delete(dbAppKey(fullId));
} catch (IOException e) {
Expand All @@ -440,6 +468,8 @@ public void stopApplication(ApplicationTerminationContext context) {
blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
} catch (Exception e) {
logger.error("Exception when stopping application {}", appId, e);
} finally {
AppsWithRecoveryDisabled.removeApp(appId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2507,4 +2507,13 @@ package object config {
.version("3.5.0")
.intConf
.createWithDefault(Int.MaxValue)

private[spark] val SHUFFLE_SERVER_RECOVERY_DISABLED =
ConfigBuilder("spark.yarn.shuffle.server.recovery.disabled")
.internal()
.doc("Set to true for applications that prefer to disable recovery when the External " +
"Shuffle Service restarts. This configuration only takes effect on YARN.")
.version("3.5.0")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to yarn and not supported by other resource managers - call it out in the description ?
Same for the documentation update below - make it explicit that it is for yarn mode, and make sure it is in the right section in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is only for yarn should we rename to be spark.yarn.shuffle and only put in yarn docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I moved the documentation for this config to running-on-yarn.md.
  • In the security doc, I moved the definition under Yarn section.
    PTAL

}
12 changes: 11 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ To use a custom metrics.properties for the application master and executors, upd
<td><code>spark.yarn.report.loggingFrequency</code></td>
<td><code>30</code></td>
<td>
Maximum number of application reports processed until the next application status
Maximum number of application reports processed until the next application status
is logged. If there is a change of state, the application status will be logged regardless
of the number of application reports processed.
</td>
Expand Down Expand Up @@ -683,6 +683,16 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.shuffle.server.recovery.disabled</code></td>
<td>false</td>
<td>
Set to true for applications that have higher security requirements and prefer that their
secret is not saved in the db. The shuffle data of such applications wll not be recovered after
the External Shuffle Service restarts.
</td>
<td>3.5.0</td>
</tr>
</table>

#### Available patterns for SHS custom executor log URL
Expand Down
14 changes: 14 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ distributing the shared secret. Each application will use a unique shared secret
the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of
secrets to be secure.

<table class="table table-striped">
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
<tr>
<td><code>spark.yarn.shuffle.server.recovery.disabled</code></td>
<td>false</td>
<td>
Set to true for applications that have higher security requirements and prefer that their
secret is not saved in the db. The shuffle data of such applications wll not be recovered after
the External Shuffle Service restarts.
</td>
<td>3.5.0</td>
</tr>
</table>

### Kubernetes

On Kubernetes, Spark will also automatically generate an authentication secret unique to each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import java.nio.ByteBuffer
import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{HashMap, ListBuffer}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -105,17 +108,7 @@ private[yarn] class ExecutorRunnable(
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
configureServiceData(ctx)
}

// Send the start request to the ContainerManager
Expand All @@ -128,6 +121,33 @@ private[yarn] class ExecutorRunnable(
}
}

private[yarn] def configureServiceData(ctx: ContainerLaunchContext): Unit = {
val secretString = securityMgr.getSecretKey()

val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
if (!sparkConf.get(SHUFFLE_SERVER_RECOVERY_DISABLED)) {
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
} else {
val payload = new mutable.HashMap[String, Object]()
payload.put(SHUFFLE_SERVER_RECOVERY_DISABLED.key, java.lang.Boolean.TRUE)
if (secretString != null) {
payload.put(ExecutorRunnable.SECRET_KEY, secretString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep the same behavior here, when the secretString is null, we still put ByteBuffer.allocate(0) there in the payload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to keep the same behavior. If authentication is disabled, we don't need to set the value of the secret key. Also, this was a feedback from @mridulm

}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val jsonString = mapper.writeValueAsString(payload)
ctx.setServiceData(Collections.singletonMap(serviceName, JavaUtils.stringToBytes(jsonString)))
}
}

private def prepareCommand(): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
Expand Down Expand Up @@ -202,3 +222,7 @@ private[yarn] class ExecutorRunnable(
env
}
}

private[yarn] object ExecutorRunnable {
private[yarn] val SECRET_KEY = "secret"
}
Loading