Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3797] Minor addendum to Yarn shuffle service #3144

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.network.util;

import java.nio.ByteBuffer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
Expand All @@ -25,6 +27,8 @@
import java.io.ObjectOutputStream;

import com.google.common.io.Closeables;
import com.google.common.base.Charsets;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,4 +77,20 @@ public static int nonNegativeHash(Object obj) {
int hash = obj.hashCode();
return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
}

/**
* Convert the given string to a byte buffer. The resulting buffer can be
* converted back to the same string through {@link #bytesToString(ByteBuffer)}.
*/
public static ByteBuffer stringToBytes(String s) {
return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
}

/**
* Convert the given byte buffer to a string. The resulting string can be
* converted back to the same byte buffer through {@link #stringToBytes(String)}.
*/
public static String bytesToString(ByteBuffer b) {
return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import java.lang.Override;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.util.JavaUtils;

/**
* A class that manages shuffle secret used by the external shuffle service.
Expand All @@ -34,30 +34,10 @@ public class ShuffleSecretManager implements SecretKeyHolder {
private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
private final ConcurrentHashMap<String, String> shuffleSecretMap;

private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");

// Spark user used for authenticating SASL connections
// Note that this must match the value in org.apache.spark.SecurityManager
private static final String SPARK_SASL_USER = "sparkSaslUser";

/**
* Convert the given string to a byte buffer. The resulting buffer can be converted back to
* the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
*/
public static ByteBuffer stringToBytes(String s) {
return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
}

/**
* Convert the given byte buffer to a string. The resulting string can be converted back to
* the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
*/
public static String bytesToString(ByteBuffer b) {
return new String(b.array(), UTF8_CHARSET);
}

public ShuffleSecretManager() {
shuffleSecretMap = new ConcurrentHashMap<String, String>();
}
Expand All @@ -80,7 +60,7 @@ public void registerApp(String appId, String shuffleSecret) {
* Register an application with its secret specified as a byte buffer.
*/
public void registerApp(String appId, ByteBuffer shuffleSecret) {
registerApp(appId, bytesToString(shuffleSecret));
registerApp(appId, JavaUtils.bytesToString(shuffleSecret));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}

import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.network.sasl.ShuffleSecretManager
import org.apache.spark.network.util.JavaUtils

@deprecated("use yarn/stable", "1.2.0")
class ExecutorRunnable(
Expand Down Expand Up @@ -98,7 +98,8 @@ class ExecutorRunnable(
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
ShuffleSecretManager.stringToBytes(secretString)
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}

import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.network.sasl.ShuffleSecretManager
import org.apache.spark.network.util.JavaUtils


class ExecutorRunnable(
Expand Down Expand Up @@ -97,7 +97,8 @@ class ExecutorRunnable(
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
ShuffleSecretManager.stringToBytes(secretString)
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
Expand Down