Skip to content

Commit

Permalink
[SPARK-43179][SHUFFLE] Allowing apps to control whether their metadat…
Browse files Browse the repository at this point in the history
…a gets saved in the db by the External Shuffle Service

### What changes were proposed in this pull request?
This change allows applications to control whether their metadata gets saved in the db. For applications with higher security requirements, storing application secret in the db without any encryption is a potential security risk. While filesystem ACLs can help protect the access to the db, this level of security is not sufficient for some use cases. Such applications can chose to not save their metadata in the db. As a result, these applications may experience more failures in the event of a node restart, but we believe this trade-off is acceptable given the increased security risk.

### Why are the changes needed?
These modifications are necessary to reduce the likelihood of security threats for applications with elevated security requirements.

### Does this PR introduce _any_ user-facing change?
No. Added a configuration `spark.shuffle.server.recovery.disabled` which by default is `false`. When set to `true`, the metadata of the application will not saved in the db.

### How was this patch tested?
Added UTs and also verified with test applications in our test environment.

Closes apache#40843 from otterc/SPARK-43179.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
otterc authored and Mridul Muralidharan committed Apr 21, 2023
1 parent 3523d83 commit 958a7d5
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.base.Preconditions;

/**
* Stores the applications which have recovery disabled.
*/
public final class AppsWithRecoveryDisabled {

private static final AppsWithRecoveryDisabled INSTANCE = new AppsWithRecoveryDisabled();

private final Set<String> appsWithRecoveryDisabled = Collections.newSetFromMap(
new ConcurrentHashMap<>());

private AppsWithRecoveryDisabled() {
}

/**
* Add an application for which recovery is disabled.
* @param appId application id
*/
public static void disableRecoveryOfApp(String appId) {
Preconditions.checkNotNull(appId);
INSTANCE.appsWithRecoveryDisabled.add(appId);
}

/**
* Returns whether an application is enabled for recovery or not.
* @param appId application id
* @return true if the application is enabled for recovery; false otherwise.
*/
public static boolean isRecoveryEnabledForApp(String appId) {
Preconditions.checkNotNull(appId);
return !INSTANCE.appsWithRecoveryDisabled.contains(appId);
}

/**
* Removes the application from the store.
* @param appId application id
*/
public static void removeApp(String appId) {
Preconditions.checkNotNull(appId);
INSTANCE.appsWithRecoveryDisabled.remove(appId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void registerExecutor(
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
try {
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
byte[] key = dbAppExecKey(fullId);
byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
Expand Down Expand Up @@ -224,7 +224,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
// Only touch executors associated with the appId that was removed.
if (appId.equals(fullId.appId)) {
it.remove();
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(fullId.appId)) {
try {
db.delete(dbAppExecKey(fullId));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ void closeAndDeletePartitionsIfNeeded(
@VisibleForTesting
void removeAppAttemptPathInfoFromDB(String appId, int attemptId) {
AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
try {
byte[] key = getDbAppAttemptPathsKey(appAttemptId);
db.delete(key);
Expand Down Expand Up @@ -967,7 +967,7 @@ private void shutdownMergedShuffleCleanerNow() {
* Write the application attempt's local path information to the DB
*/
private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) {
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
try {
byte[] key = getDbAppAttemptPathsKey(appAttemptId);
Expand All @@ -985,7 +985,8 @@ private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo app
*/
private void writeAppAttemptShuffleMergeInfoToDB(
AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(
appAttemptShuffleMergeId.appId)) {
// Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
try{
byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.AppsWithRecoveryDisabled;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
Expand Down Expand Up @@ -136,6 +138,12 @@ public class YarnShuffleService extends AuxiliaryService {

private static final boolean DEFAULT_STOP_ON_FAILURE = false;

@VisibleForTesting
static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =
"spark.yarn.shuffle.server.recovery.disabled";
@VisibleForTesting
static final String SECRET_KEY = "secret";

// just for testing when you want to find an open port
@VisibleForTesting
static int boundPort = -1;
Expand Down Expand Up @@ -407,10 +415,30 @@ private static byte[] dbAppKey(AppId appExecId) throws IOException {
public void initializeApplication(ApplicationInitializationContext context) {
String appId = context.getApplicationId().toString();
try {
ByteBuffer shuffleSecret = context.getApplicationDataForService();
ByteBuffer appServiceData = context.getApplicationDataForService();
String payload = JavaUtils.bytesToString(appServiceData);
String shuffleSecret;
Map<String, Object> metaInfo;
try {
metaInfo = mapper.readValue(payload,
new TypeReference<Map<String, Object>>() {});
Object metadataStorageVal = metaInfo.get(SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED);
if (metadataStorageVal != null && (Boolean) metadataStorageVal) {
AppsWithRecoveryDisabled.disableRecoveryOfApp(appId);
logger.info("Disabling metadata persistence for application {}", appId);
}
} catch (IOException ioe) {
logger.warn("Unable to parse application data for service: " + payload);
metaInfo = null;
}
if (isAuthenticationEnabled()) {
AppId fullId = new AppId(appId);
if (db != null) {
if (metaInfo != null) {
shuffleSecret = (String) metaInfo.get(SECRET_KEY);
} else {
shuffleSecret = payload;
}
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
AppId fullId = new AppId(appId);
byte[] key = dbAppKey(fullId);
byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
Expand All @@ -428,7 +456,7 @@ public void stopApplication(ApplicationTerminationContext context) {
try {
if (isAuthenticationEnabled()) {
AppId fullId = new AppId(appId);
if (db != null) {
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
try {
db.delete(dbAppKey(fullId));
} catch (IOException e) {
Expand All @@ -440,6 +468,8 @@ public void stopApplication(ApplicationTerminationContext context) {
blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
} catch (Exception e) {
logger.error("Exception when stopping application {}", appId, e);
} finally {
AppsWithRecoveryDisabled.removeApp(appId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2507,4 +2507,13 @@ package object config {
.version("3.5.0")
.intConf
.createWithDefault(Int.MaxValue)

private[spark] val SHUFFLE_SERVER_RECOVERY_DISABLED =
ConfigBuilder("spark.yarn.shuffle.server.recovery.disabled")
.internal()
.doc("Set to true for applications that prefer to disable recovery when the External " +
"Shuffle Service restarts. This configuration only takes effect on YARN.")
.version("3.5.0")
.booleanConf
.createWithDefault(false)
}
12 changes: 11 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ To use a custom metrics.properties for the application master and executors, upd
<td><code>spark.yarn.report.loggingFrequency</code></td>
<td><code>30</code></td>
<td>
Maximum number of application reports processed until the next application status
Maximum number of application reports processed until the next application status
is logged. If there is a change of state, the application status will be logged regardless
of the number of application reports processed.
</td>
Expand Down Expand Up @@ -683,6 +683,16 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.shuffle.server.recovery.disabled</code></td>
<td>false</td>
<td>
Set to true for applications that have higher security requirements and prefer that their
secret is not saved in the db. The shuffle data of such applications wll not be recovered after
the External Shuffle Service restarts.
</td>
<td>3.5.0</td>
</tr>
</table>

#### Available patterns for SHS custom executor log URL
Expand Down
14 changes: 14 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ distributing the shared secret. Each application will use a unique shared secret
the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of
secrets to be secure.

<table class="table table-striped">
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
<tr>
<td><code>spark.yarn.shuffle.server.recovery.disabled</code></td>
<td>false</td>
<td>
Set to true for applications that have higher security requirements and prefer that their
secret is not saved in the db. The shuffle data of such applications wll not be recovered after
the External Shuffle Service restarts.
</td>
<td>3.5.0</td>
</tr>
</table>

### Kubernetes

On Kubernetes, Spark will also automatically generate an authentication secret unique to each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import java.nio.ByteBuffer
import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{HashMap, ListBuffer}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -105,17 +108,7 @@ private[yarn] class ExecutorRunnable(
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
configureServiceData(ctx)
}

// Send the start request to the ContainerManager
Expand All @@ -128,6 +121,33 @@ private[yarn] class ExecutorRunnable(
}
}

private[yarn] def configureServiceData(ctx: ContainerLaunchContext): Unit = {
val secretString = securityMgr.getSecretKey()

val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
if (!sparkConf.get(SHUFFLE_SERVER_RECOVERY_DISABLED)) {
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
} else {
val payload = new mutable.HashMap[String, Object]()
payload.put(SHUFFLE_SERVER_RECOVERY_DISABLED.key, java.lang.Boolean.TRUE)
if (secretString != null) {
payload.put(ExecutorRunnable.SECRET_KEY, secretString)
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val jsonString = mapper.writeValueAsString(payload)
ctx.setServiceData(Collections.singletonMap(serviceName, JavaUtils.stringToBytes(jsonString)))
}
}

private def prepareCommand(): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
Expand Down Expand Up @@ -202,3 +222,7 @@ private[yarn] class ExecutorRunnable(
env
}
}

private[yarn] object ExecutorRunnable {
private[yarn] val SECRET_KEY = "secret"
}
Loading

0 comments on commit 958a7d5

Please sign in to comment.