From 958a7d594d44900b82a70658c105cc7142fff92e Mon Sep 17 00:00:00 2001 From: Chandni Singh Date: Fri, 21 Apr 2023 12:21:35 -0500 Subject: [PATCH] [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service ### What changes were proposed in this pull request? This change allows applications to control whether their metadata gets saved in the db. For applications with higher security requirements, storing application secret in the db without any encryption is a potential security risk. While filesystem ACLs can help protect the access to the db, this level of security is not sufficient for some use cases. Such applications can chose to not save their metadata in the db. As a result, these applications may experience more failures in the event of a node restart, but we believe this trade-off is acceptable given the increased security risk. ### Why are the changes needed? These modifications are necessary to reduce the likelihood of security threats for applications with elevated security requirements. ### Does this PR introduce _any_ user-facing change? No. Added a configuration `spark.shuffle.server.recovery.disabled` which by default is `false`. When set to `true`, the metadata of the application will not saved in the db. ### How was this patch tested? Added UTs and also verified with test applications in our test environment. Closes #40843 from otterc/SPARK-43179. Authored-by: Chandni Singh Signed-off-by: Mridul Muralidharan gmail.com> --- .../shuffle/AppsWithRecoveryDisabled.java | 66 ++++++++ .../shuffle/ExternalShuffleBlockResolver.java | 4 +- .../shuffle/RemoteBlockPushResolver.java | 7 +- .../network/yarn/YarnShuffleService.java | 38 ++++- .../spark/internal/config/package.scala | 9 ++ docs/running-on-yarn.md | 12 +- docs/security.md | 14 ++ .../spark/deploy/yarn/ExecutorRunnable.scala | 46 ++++-- .../deploy/yarn/ExecutorRunnableSuite.scala | 106 +++++++++++++ .../yarn/YarnShuffleServiceSuite.scala | 147 +++++++++++++++++- 10 files changed, 425 insertions(+), 24 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java new file mode 100644 index 0000000000000..6a029a1083a47 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.base.Preconditions; + +/** + * Stores the applications which have recovery disabled. + */ +public final class AppsWithRecoveryDisabled { + + private static final AppsWithRecoveryDisabled INSTANCE = new AppsWithRecoveryDisabled(); + + private final Set appsWithRecoveryDisabled = Collections.newSetFromMap( + new ConcurrentHashMap<>()); + + private AppsWithRecoveryDisabled() { + } + + /** + * Add an application for which recovery is disabled. + * @param appId application id + */ + public static void disableRecoveryOfApp(String appId) { + Preconditions.checkNotNull(appId); + INSTANCE.appsWithRecoveryDisabled.add(appId); + } + + /** + * Returns whether an application is enabled for recovery or not. + * @param appId application id + * @return true if the application is enabled for recovery; false otherwise. + */ + public static boolean isRecoveryEnabledForApp(String appId) { + Preconditions.checkNotNull(appId); + return !INSTANCE.appsWithRecoveryDisabled.contains(appId); + } + + /** + * Removes the application from the store. + * @param appId application id + */ + public static void removeApp(String appId) { + Preconditions.checkNotNull(appId); + INSTANCE.appsWithRecoveryDisabled.remove(appId); + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 6bca19a2511ac..ea341b7391682 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -151,7 +151,7 @@ public void registerExecutor( AppExecId fullId = new AppExecId(appId, execId); logger.info("Registered executor {} with {}", fullId, executorInfo); try { - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { byte[] key = dbAppExecKey(fullId); byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8); db.put(key, value); @@ -224,7 +224,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { // Only touch executors associated with the appId that was removed. if (appId.equals(fullId.appId)) { it.remove(); - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(fullId.appId)) { try { db.delete(dbAppExecKey(fullId)); } catch (IOException e) { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index df2d1fa12d17c..7f0862fcef435 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -494,7 +494,7 @@ void closeAndDeletePartitionsIfNeeded( @VisibleForTesting void removeAppAttemptPathInfoFromDB(String appId, int attemptId) { AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { try { byte[] key = getDbAppAttemptPathsKey(appAttemptId); db.delete(key); @@ -967,7 +967,7 @@ private void shutdownMergedShuffleCleanerNow() { * Write the application attempt's local path information to the DB */ private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); try { byte[] key = getDbAppAttemptPathsKey(appAttemptId); @@ -985,7 +985,8 @@ private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo app */ private void writeAppAttemptShuffleMergeInfoToDB( AppAttemptShuffleMergeId appAttemptShuffleMergeId) { - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp( + appAttemptShuffleMergeId.appId)) { // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles try{ byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 1fa0eebb7f8bd..578c1a19c4082 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -59,6 +60,7 @@ import org.apache.spark.network.sasl.ShuffleSecretManager; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; +import org.apache.spark.network.shuffle.AppsWithRecoveryDisabled; import org.apache.spark.network.shuffle.ExternalBlockHandler; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; @@ -136,6 +138,12 @@ public class YarnShuffleService extends AuxiliaryService { private static final boolean DEFAULT_STOP_ON_FAILURE = false; + @VisibleForTesting + static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED = + "spark.yarn.shuffle.server.recovery.disabled"; + @VisibleForTesting + static final String SECRET_KEY = "secret"; + // just for testing when you want to find an open port @VisibleForTesting static int boundPort = -1; @@ -407,10 +415,30 @@ private static byte[] dbAppKey(AppId appExecId) throws IOException { public void initializeApplication(ApplicationInitializationContext context) { String appId = context.getApplicationId().toString(); try { - ByteBuffer shuffleSecret = context.getApplicationDataForService(); + ByteBuffer appServiceData = context.getApplicationDataForService(); + String payload = JavaUtils.bytesToString(appServiceData); + String shuffleSecret; + Map metaInfo; + try { + metaInfo = mapper.readValue(payload, + new TypeReference>() {}); + Object metadataStorageVal = metaInfo.get(SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED); + if (metadataStorageVal != null && (Boolean) metadataStorageVal) { + AppsWithRecoveryDisabled.disableRecoveryOfApp(appId); + logger.info("Disabling metadata persistence for application {}", appId); + } + } catch (IOException ioe) { + logger.warn("Unable to parse application data for service: " + payload); + metaInfo = null; + } if (isAuthenticationEnabled()) { - AppId fullId = new AppId(appId); - if (db != null) { + if (metaInfo != null) { + shuffleSecret = (String) metaInfo.get(SECRET_KEY); + } else { + shuffleSecret = payload; + } + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { + AppId fullId = new AppId(appId); byte[] key = dbAppKey(fullId); byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8); db.put(key, value); @@ -428,7 +456,7 @@ public void stopApplication(ApplicationTerminationContext context) { try { if (isAuthenticationEnabled()) { AppId fullId = new AppId(appId); - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { try { db.delete(dbAppKey(fullId)); } catch (IOException e) { @@ -440,6 +468,8 @@ public void stopApplication(ApplicationTerminationContext context) { blockHandler.applicationRemoved(appId, false /* clean up local dirs */); } catch (Exception e) { logger.error("Exception when stopping application {}", appId, e); + } finally { + AppsWithRecoveryDisabled.removeApp(appId); } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 11febabd1bd9d..125025d6a2fdf 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2507,4 +2507,13 @@ package object config { .version("3.5.0") .intConf .createWithDefault(Int.MaxValue) + + private[spark] val SHUFFLE_SERVER_RECOVERY_DISABLED = + ConfigBuilder("spark.yarn.shuffle.server.recovery.disabled") + .internal() + .doc("Set to true for applications that prefer to disable recovery when the External " + + "Shuffle Service restarts. This configuration only takes effect on YARN.") + .version("3.5.0") + .booleanConf + .createWithDefault(false) } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 19649ffb380de..8c024901352fc 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -650,7 +650,7 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.report.loggingFrequency 30 - Maximum number of application reports processed until the next application status + Maximum number of application reports processed until the next application status is logged. If there is a change of state, the application status will be logged regardless of the number of application reports processed. @@ -683,6 +683,16 @@ To use a custom metrics.properties for the application master and executors, upd 3.0.0 + + spark.yarn.shuffle.server.recovery.disabled + false + + Set to true for applications that have higher security requirements and prefer that their + secret is not saved in the db. The shuffle data of such applications wll not be recovered after + the External Shuffle Service restarts. + + 3.5.0 + #### Available patterns for SHS custom executor log URL diff --git a/docs/security.md b/docs/security.md index 7201ea5185928..c856b94388fd8 100644 --- a/docs/security.md +++ b/docs/security.md @@ -60,6 +60,20 @@ distributing the shared secret. Each application will use a unique shared secret the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of secrets to be secure. + + + + + + + + +
Property NameDefaultMeaningSince Version
spark.yarn.shuffle.server.recovery.disabledfalse + Set to true for applications that have higher security requirements and prefer that their + secret is not saved in the db. The shuffle data of such applications wll not be recovered after + the External Shuffle Service restarts. + 3.5.0
+ ### Kubernetes On Kubernetes, Spark will also automatically generate an authentication secret unique to each diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 1f3121ed224fe..e3fcf5472f54d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -21,8 +21,11 @@ import java.nio.ByteBuffer import java.util.Collections import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.{HashMap, ListBuffer} +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation @@ -105,17 +108,7 @@ private[yarn] class ExecutorRunnable( // started on the NodeManager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { - val secretString = securityMgr.getSecretKey() - val secretBytes = - if (secretString != null) { - // This conversion must match how the YarnShuffleService decodes our secret - JavaUtils.stringToBytes(secretString) - } else { - // Authentication is not enabled, so just provide dummy metadata - ByteBuffer.allocate(0) - } - val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) - ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) + configureServiceData(ctx) } // Send the start request to the ContainerManager @@ -128,6 +121,33 @@ private[yarn] class ExecutorRunnable( } } + private[yarn] def configureServiceData(ctx: ContainerLaunchContext): Unit = { + val secretString = securityMgr.getSecretKey() + + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + if (!sparkConf.get(SHUFFLE_SERVER_RECOVERY_DISABLED)) { + val secretBytes = + if (secretString != null) { + // This conversion must match how the YarnShuffleService decodes our secret + JavaUtils.stringToBytes(secretString) + } else { + // Authentication is not enabled, so just provide dummy metadata + ByteBuffer.allocate(0) + } + ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) + } else { + val payload = new mutable.HashMap[String, Object]() + payload.put(SHUFFLE_SERVER_RECOVERY_DISABLED.key, java.lang.Boolean.TRUE) + if (secretString != null) { + payload.put(ExecutorRunnable.SECRET_KEY, secretString) + } + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val jsonString = mapper.writeValueAsString(payload) + ctx.setServiceData(Collections.singletonMap(serviceName, JavaUtils.stringToBytes(jsonString))) + } + } + private def prepareCommand(): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() @@ -202,3 +222,7 @@ private[yarn] class ExecutorRunnable( env } } + +private[yarn] object ExecutorRunnable { + private[yarn] val SECRET_KEY = "secret" +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala new file mode 100644 index 0000000000000..1ef3c9c410af9 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.hadoop.yarn.api.records.{ContainerLaunchContext, LocalResource} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.Records +import org.mockito.Mockito.{mock, when} + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils + +class ExecutorRunnableSuite extends SparkFunSuite { + + private def createExecutorRunnable( + sparkConf: SparkConf = new SparkConf(), + securityManager: SecurityManager = mock(classOf[SecurityManager])): ExecutorRunnable = { + new ExecutorRunnable( + None, + new YarnConfiguration(), + sparkConf, + "yarn", + "exec-1", + "localhost", + 1, + 1, + "application_123_1", + securityManager, + Map.empty[String, LocalResource], + 0) + } + + for (shuffleServerRecoveryDisabled <- Seq(true, false)) { + test("validate service data when $shuffleServerRecoveryDisabled is " + + shuffleServerRecoveryDisabled) { + val sparkConf = new SparkConf() + sparkConf.set(SHUFFLE_SERVER_RECOVERY_DISABLED, shuffleServerRecoveryDisabled) + val securityManager = mock(classOf[SecurityManager]) + when(securityManager.getSecretKey()).thenReturn("secret") + val execRunnable = createExecutorRunnable(sparkConf, securityManager) + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) + .asInstanceOf[ContainerLaunchContext] + execRunnable.configureServiceData(ctx) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + val serviceData = ctx.getServiceData.get(serviceName) + assert(serviceData != null) + val payload: String = JavaUtils.bytesToString(serviceData) + var metaInfo: java.util.Map[String, AnyRef] = null + val secret = try { + val mapper = new ObjectMapper + metaInfo = mapper.readValue(payload, + new TypeReference[java.util.Map[String, AnyRef]]() {}) + metaInfo.get(ExecutorRunnable.SECRET_KEY).asInstanceOf[String] + } catch { + case _: IOException => + payload + } + assert(secret equals "secret") + if (shuffleServerRecoveryDisabled) { + assert(metaInfo != null) + val metadataStorageVal: Any = metaInfo.get(SHUFFLE_SERVER_RECOVERY_DISABLED.key) + assert(metadataStorageVal != null && metadataStorageVal.asInstanceOf[Boolean]) + } + } + } + + test("if shuffle server recovery is disabled and authentication is disabled, then" + + " service data should not contain secret") { + val sparkConf = new SparkConf() + sparkConf.set(SHUFFLE_SERVER_RECOVERY_DISABLED, true) + val execRunnable = createExecutorRunnable(sparkConf) + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) + .asInstanceOf[ContainerLaunchContext] + execRunnable.configureServiceData(ctx) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + val serviceData = ctx.getServiceData.get(serviceName) + assert(serviceData != null) + val payload: String = JavaUtils.bytesToString(serviceData) + val mapper = new ObjectMapper + val metaInfo = mapper.readValue(payload, + new TypeReference[java.util.Map[String, AnyRef]]() {}) + assert(!metaInfo.containsKey(ExecutorRunnable.SECRET_KEY)) + val metadataStorageVal: Any = metaInfo.get(SHUFFLE_SERVER_RECOVERY_DISABLED.key) + assert(metadataStorageVal != null && metadataStorageVal.asInstanceOf[Boolean]) + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 075a21c399e0e..3e78262a765e5 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -25,9 +25,12 @@ import java.util.EnumSet import scala.annotation.tailrec import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ import com.codahale.metrics.MetricSet +import com.fasterxml.jackson.databind.ObjectMapper +import com. fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.Path import org.apache.hadoop.metrics2.impl.MetricsSystemImpl import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem @@ -49,6 +52,7 @@ import org.apache.spark.network.shuffle.{Constants, MergedShuffleFileManager, No import org.apache.spark.network.shuffle.RemoteBlockPushResolver._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.shuffledb.DBBackend +import org.apache.spark.network.util.JavaUtils import org.apache.spark.network.util.TransportConf import org.apache.spark.network.yarn.util.HadoopConfigProvider import org.apache.spark.tags.ExtendedLevelDBTest @@ -1032,9 +1036,23 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s2.stop() } - private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = { - val secret = ByteBuffer.wrap(new Array[Byte](0)) - new ApplicationInitializationContext(user, appId, secret) + private def makeAppInfo(user: String, appId: ApplicationId, + metadataStorageDisabled: Boolean = false, + authEnabled: Boolean = true): ApplicationInitializationContext = { + if (!metadataStorageDisabled) { + val secret = ByteBuffer.wrap(new Array[Byte](0)) + new ApplicationInitializationContext(user, appId, secret) + } else { + val payload = new mutable.HashMap[String, Object]() + payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, java.lang.Boolean.TRUE) + if (authEnabled) { + payload.put(YarnShuffleService.SECRET_KEY, "") + } + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val jsonString = mapper.writeValueAsString(payload) + new ApplicationInitializationContext(user, appId, JavaUtils.stringToBytes(jsonString)) + } } test("recovery db should not be created if NM recovery is not enabled") { @@ -1109,6 +1127,129 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null) assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager]) } + + test("secret of applications should not be stored in db if they want to be excluded") { + // set auth to true to test the secrets recovery + yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) + s1 = createYarnShuffleService() + val app1Id = ApplicationId.newInstance(1681252509, 1) + val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true) + s1.initializeApplication(app1Data) + val app2Id = ApplicationId.newInstance(1681252509, 2) + val app2Data = makeAppInfo("user", app2Id) + s1.initializeApplication(app2Data) + assert(s1.secretManager.getSecretKey(app1Id.toString()) == "") + assert(s1.secretManager.getSecretKey(app2Id.toString()) == "") + + val execShuffleInfo1 = + new ExecutorShuffleInfo( + Array(new File(tempDir, "foo/foo").getAbsolutePath, + new File(tempDir, "bar/bar").getAbsolutePath), 3, + SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1) + val execShuffleInfo2 = + new ExecutorShuffleInfo(Array(new File(tempDir, "bippy/bippy").getAbsolutePath), + 3, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1) + + val blockHandler = s1.blockHandler + val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) + blockResolver.registerExecutor(app1Id.toString, "exec-1", execShuffleInfo1) + blockResolver.registerExecutor(app2Id.toString, "exec-2", execShuffleInfo2) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should + be(Some(execShuffleInfo1)) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should + be(Some(execShuffleInfo2)) + + val mergeManager = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] + mergeManager.registerExecutor(app1Id.toString, execShuffleInfo1) + mergeManager.registerExecutor(app2Id.toString, execShuffleInfo2) + val localDirsApp1 = Array(new File(tempDir, "foo/merge_manager_1").getAbsolutePath, + new File(tempDir, "bar/merge_manager_1").getAbsolutePath) + val localDirsApp2 = Array(new File(tempDir, "bippy/merge_manager_1").getAbsolutePath) + val appPathsInfo1 = new AppPathsInfo(localDirsApp1, 3) + val appPathsInfo2 = new AppPathsInfo(localDirsApp2, 3) + + ShuffleTestAccessor.getAppPathsInfo(app1Id.toString, mergeManager) should + be(Some(appPathsInfo1)) + ShuffleTestAccessor.getAppPathsInfo(app2Id.toString, mergeManager) should + be(Some(appPathsInfo2)) + + val partitionIdApp1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1) + val partitionIdApp2 = new AppAttemptShuffleMergeId(app2Id.toString, 1, 2, 1) + prepareAppShufflePartition(mergeManager, partitionIdApp1, 1, "3") + prepareAppShufflePartition(mergeManager, partitionIdApp2, 2, "4") + ShuffleTestAccessor.finalizeShuffleMerge(mergeManager, partitionIdApp1) + ShuffleTestAccessor.finalizeShuffleMerge(mergeManager, partitionIdApp2) + + val execStateFile = s1.registeredExecutorFile + assert(execStateFile.exists(), s"$execStateFile did not exist") + val mergeMgrFile = s1.mergeManagerFile + assert(mergeMgrFile.exists(), s"$mergeMgrFile did not exist") + + // shuffle service goes down + s1.stop() + // Yarn Shuffle service comes back up without custom mergeManager + s2 = createYarnShuffleService() + // Since secret of app1 is not saved in the db, it isn't recovered + assert(s2.secretManager.getSecretKey(app1Id.toString()) == null) + assert(s2.secretManager.getSecretKey(app2Id.toString()) == "") + + val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) + val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] + + // App1 executor information should not have been saved in the db. + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be(None) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be( + Some(execShuffleInfo2)) + // App1 should not have any merge related metadata stored in the db. + ShuffleTestAccessor + .getAppPathsInfo(app1Id.toString, mergeManager2) should be(None) + ShuffleTestAccessor.getAppPathsInfo(app2Id.toString, mergeManager2) should be( + Some(appPathsInfo2)) + + // Even though App1-partition1 was finalized before the restart, merge manager will recreate + // the partition since it didn't have any metadata saved for that app. + mergeManager2.registerExecutor(app1Id.toString, execShuffleInfo1) + prepareAppShufflePartition(mergeManager2, partitionIdApp1, 1, "3") + val dataFileApp1 = + ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager2, partitionIdApp1, 1) + dataFileApp1.length() should be((4 * 5 + 1) * DUMMY_BLOCK_DATA.length) + // Since app2-partition2 was metadata was saved, it cannot be re-opened. + val error = intercept[BlockPushNonFatalFailure] { + ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo( + mergeManager2, partitionIdApp2, 2, "3") + } + assert(error.getMessage.contains("is finalized")) + + s2.stopApplication(new ApplicationTerminationContext(app1Id)) + s2.stopApplication(new ApplicationTerminationContext(app2Id)) + s2.stop() + } + + test("executor info of apps should not be stored in db if they want to be excluded. " + + "Authentication is turned off") { + s1 = createYarnShuffleService() + val app1Id = ApplicationId.newInstance(1681252509, 1) + val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true, authEnabled = false) + s1.initializeApplication(app1Data) + val execShuffleInfo1 = + new ExecutorShuffleInfo( + Array(new File(tempDir, "foo/foo").getAbsolutePath, + new File(tempDir, "bar/bar").getAbsolutePath), 3, SORT_MANAGER) + val blockHandler = s1.blockHandler + val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) + blockResolver.registerExecutor(app1Id.toString, "exec-1", execShuffleInfo1) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should + be(Some(execShuffleInfo1)) + // shuffle service goes down + s1.stop() + // Yarn Shuffle service comes back up without custom mergeManager + s2 = createYarnShuffleService() + val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) + // App1 executor information should not have been saved in the db. + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be(None) + s2.stopApplication(new ApplicationTerminationContext(app1Id)) + s2.stop() + } } @ExtendedLevelDBTest