-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service #40843
Conversation
…ved in the db by the External Shuffle Service
@mridulm @tgravescs @zhouyejoe @akpatnam25 @shuwang21 Please help review |
+CC @attilapiros |
so the intention here is it still uses external shuffle service but doesn't store the secret so if node manager goes down it can't recover that, correct? Personally I'm fine with adding this option, you also need to add documentation for this new configuration in .md file with explanation why you would want to disable it. I'm wondering if we want something in the security doc about it. |
Yes. Since, the secret is not encrypted, it is a security violation for some applications. If the NM goes down, the metadata of these apps will not be recovered but that is acceptable.
I updated the |
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
Outdated
Show resolved
Hide resolved
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
Outdated
Show resolved
Hide resolved
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
Outdated
Show resolved
Hide resolved
val payload = new mutable.HashMap[String, Object]() | ||
payload.put(SHUFFLE_SERVER_RECOVERY_DISABLED.key, java.lang.Boolean.TRUE) | ||
if (secretString != null) { | ||
payload.put(ExecutorRunnable.SECRET_KEY, secretString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to keep the same behavior here, when the secretString is null, we still put ByteBuffer.allocate(0) there in the payload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to keep the same behavior. If authentication is disabled, we don't need to set the value of the secret key. Also, this was a feedback from @mridulm
Exactly that usecase @tgravescs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of comments, rest looks good.
"Shuffle Service restarts.") | ||
.version("3.5.0") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specific to yarn and not supported by other resource managers - call it out in the description ?
Same for the documentation update below - make it explicit that it is for yarn mode, and make sure it is in the right section in the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is only for yarn should we rename to be spark.yarn.shuffle and only put in yarn docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I moved the documentation for this config to
running-on-yarn.md
. - In the security doc, I moved the definition under
Yarn
section.
PTAL
shuffleSecret = payload; | ||
} | ||
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { | ||
AppId fullId = new AppId(appId); | ||
byte[] key = dbAppKey(fullId); | ||
byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test with shuffleSecret = null
and appServiceData
is a json ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed the test that I added executor info of apps should not be stored in db if they want to be excluded. Authentication is turned off
to test this case.
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
Show resolved
Hide resolved
As we can run multiple external shuffle services on the same NM what about introducing just a new config for the whole external shuffle service controlling whether recovery is enabled. Then those applications with higher security can use the external shuffle service where the recovery is disabled. I think regarding code complexity and code size it would be a much better solution. WDYT? |
It can be done but setting up and running another shuffle service requires much more effort at least in our production environment.
The code change here is quite small. Agreed that disabling recovery for the whole ESS may need much smaller change but there is not much complexity with this change as well. |
lgtm |
Thanks for fixing this @otterc ! |
### What changes were proposed in this pull request? Introduced a bug with this change: #40843. To get the value that is persisted in db, we used to use `mapper.writeValueAsString(ByteBuffer)`. We changed it to `mapper.writeValueAsString(String)`. However, when we load from the db, it still uses `ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);` causing exceptions when the shuffle service is unable to recover the apps: ``` ERROR org.apache.spark.network.server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5764589675121231159 java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: digest response format violation. Mismatched response. at org.sparkproject.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121) at org.apache.spark.network.sasl.SaslRpcHandler.doAuthChallenge(SaslRpcHandler.java:94) at org.apache.spark.network.crypto.AuthRpcHandler.doAuthChallenge(AuthRpcHandler.java:81) at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:59) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.channel.DefaultChannelPipeline ``` ### Why are the changes needed? It fixes the bug that was introduced with SPARK-43179 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing UTs in the `YarnShuffleServiceSuite` were using empty password which masked the issue. Changed it to use a non-empty password. Closes #41502 from otterc/SPARK-43179-followup. Authored-by: Chandni Singh <chsingh@linkedin.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? Introduced a bug with this change: apache#40843. To get the value that is persisted in db, we used to use `mapper.writeValueAsString(ByteBuffer)`. We changed it to `mapper.writeValueAsString(String)`. However, when we load from the db, it still uses `ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);` causing exceptions when the shuffle service is unable to recover the apps: ``` ERROR org.apache.spark.network.server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5764589675121231159 java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: digest response format violation. Mismatched response. at org.sparkproject.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121) at org.apache.spark.network.sasl.SaslRpcHandler.doAuthChallenge(SaslRpcHandler.java:94) at org.apache.spark.network.crypto.AuthRpcHandler.doAuthChallenge(AuthRpcHandler.java:81) at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:59) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.channel.DefaultChannelPipeline ``` ### Why are the changes needed? It fixes the bug that was introduced with SPARK-43179 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing UTs in the `YarnShuffleServiceSuite` were using empty password which masked the issue. Changed it to use a non-empty password. Closes apache#41502 from otterc/SPARK-43179-followup. Authored-by: Chandni Singh <chsingh@linkedin.com> Signed-off-by: Sean Owen <srowen@gmail.com>
What changes were proposed in this pull request?
This change allows applications to control whether their metadata gets saved in the db. For applications with higher security requirements, storing application secret in the db without any encryption is a potential security risk. While filesystem ACLs can help protect the access to the db, this level of security is not sufficient for some use cases. Such applications can chose to not save their metadata in the db. As a result, these applications may experience more failures in the event of a node restart, but we believe this trade-off is acceptable given the increased security risk.
Why are the changes needed?
These modifications are necessary to reduce the likelihood of security threats for applications with elevated security requirements.
Does this PR introduce any user-facing change?
No. Added a configuration
spark.shuffle.server.recovery.disabled
which by default isfalse
. When set totrue
, the metadata of the application will not saved in the db.How was this patch tested?
Added UTs and also verified with test applications in our test environment.