Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload Cred CLI #2

Open
wants to merge 13 commits into
base: pushcreds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>

<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class TransportClient implements Closeable {
private final Channel channel;
private final TransportResponseHandler handler;
@Nullable private String clientId;
@Nullable private String clientUser;
private volatile boolean timedOut;

public TransportClient(Channel channel, TransportResponseHandler handler) {
Expand Down Expand Up @@ -114,6 +115,25 @@ public void setClientId(String id) {
this.clientId = id;
}

/**
* Returns the user name used by the client to authenticate itself when authentication is enabled.
*
* @return The client User Name, or null if authentication is disabled.
*/
public String getClientUser() {
return clientUser;
}

/**
* Sets the authenticated client's user name. This is meant to be used by the authentication layer.
*
* Trying to set a different client User Name after it's been set will result in an exception.
*/
public void setClientUser(String user) {
Preconditions.checkState(clientUser == null, "Client User Name has already been set.");
this.clientUser = user;
}

/**
* Requests a single chunk from the remote side, from the pre-negotiated streamId.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ public void doBootstrap(TransportClient client, Channel channel) {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {

String user = secretKeyHolder.getSaslUser(appId);
String secretKey = secretKeyHolder.getSecretKey(appId);
try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) {
try (AuthEngine engine = new AuthEngine(appId, user, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.util.TransportConf;

import static org.apache.spark.network.util.HadoopSecurityUtils.decodeMasterKey;
import static org.apache.spark.network.util.HadoopSecurityUtils.getClientToAMSecretKey;
import static org.apache.spark.network.util.HadoopSecurityUtils.getIdentifier;

/**
* A helper class for abstracting authentication and key negotiation details. This is used by
* both client and server sides, since the operations are basically the same.
Expand All @@ -54,11 +60,13 @@ class AuthEngine implements Closeable {
private static final BigInteger ONE = new BigInteger(new byte[] { 0x1 });

private final byte[] appId;
private final char[] secret;
private final byte[] user;
private char[] secret;
private final TransportConf conf;
private final Properties cryptoConf;
private final CryptoRandom random;

private String clientUser;
private byte[] authNonce;

@VisibleForTesting
Expand All @@ -69,13 +77,25 @@ class AuthEngine implements Closeable {
private CryptoCipher decryptor;

AuthEngine(String appId, String secret, TransportConf conf) throws GeneralSecurityException {
this(appId, "",secret, conf);
}

AuthEngine(String appId, String user, String secret, TransportConf conf) throws GeneralSecurityException {
this.appId = appId.getBytes(UTF_8);
this.user = user.getBytes(UTF_8);
this.conf = conf;
this.cryptoConf = conf.cryptoConf();
this.secret = secret.toCharArray();
this.random = CryptoRandomFactory.getCryptoRandom(cryptoConf);
}

/**
* Returns the user name of the client.
*/
public String getClientUserName() {
return clientUser;
}

/**
* Create the client challenge.
*
Expand All @@ -89,6 +109,7 @@ ClientChallenge challenge() throws GeneralSecurityException, IOException {

this.challenge = randomBytes(conf.encryptionKeyLength() / Byte.SIZE);
return new ClientChallenge(new String(appId, UTF_8),
new String(user, UTF_8),
conf.keyFactoryAlgorithm(),
conf.keyFactoryIterations(),
conf.cipherTransformation(),
Expand All @@ -106,9 +127,22 @@ ClientChallenge challenge() throws GeneralSecurityException, IOException {
*/
ServerResponse respond(ClientChallenge clientChallenge)
throws GeneralSecurityException, IOException {
SecretKeySpec authKey;
if (conf.isConnectionUsingTokens()) {
// Create a secret from client's token identifier and AM's master key.
ClientToAMTokenSecretManager secretManager = new ClientToAMTokenSecretManager(null,
decodeMasterKey(new String(secret)));
ClientToAMTokenIdentifier identifier = getIdentifier(clientChallenge.user);
secret = getClientToAMSecretKey(identifier, secretManager);

clientUser = identifier.getUser().getShortUserName();
} else {
clientUser = clientChallenge.user;
}

authKey = generateKey(clientChallenge.kdf, clientChallenge.iterations, clientChallenge.nonce,
clientChallenge.keyLength);

SecretKeySpec authKey = generateKey(clientChallenge.kdf, clientChallenge.iterations,
clientChallenge.nonce, clientChallenge.keyLength);
initializeForAuth(clientChallenge.cipher, clientChallenge.nonce, authKey);

byte[] challenge = validateChallenge(clientChallenge.nonce, clientChallenge.challenge);
Expand All @@ -119,6 +153,7 @@ ServerResponse respond(ClientChallenge clientChallenge)

SecretKeySpec sessionKey = generateKey(clientChallenge.kdf, clientChallenge.iterations,
sessionNonce, clientChallenge.keyLength);

this.sessionCipher = new TransportCipher(cryptoConf, clientChallenge.cipher, sessionKey,
inputIv, outputIv);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,14 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
// Here we have the client challenge, so perform the new auth protocol and set up the channel.
AuthEngine engine = null;
try {
String user = secretKeyHolder.getSaslUser(challenge.appId);
String secret = secretKeyHolder.getSecretKey(challenge.appId);
Preconditions.checkState(secret != null,
"Trying to authenticate non-registered app %s.", challenge.appId);
LOG.debug("Authenticating challenge for app {}.", challenge.appId);
engine = new AuthEngine(challenge.appId, secret, conf);
engine = new AuthEngine(challenge.appId, user, secret, conf);
ServerResponse response = engine.respond(challenge);
client.setClientUser(engine.getClientUserName());
ByteBuf responseData = Unpooled.buffer(response.encodedLength());
response.encode(responseData);
callback.onSuccess(responseData.nioBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,35 @@ public class ClientChallenge implements Encodable {
private static final byte TAG_BYTE = (byte) 0xFA;

public final String appId;
public final String user;
public final String kdf;
public final int iterations;
public final String cipher;
public final int keyLength;
public final byte[] nonce;
public final byte[] challenge;

public ClientChallenge(
String appId,
String kdf,
int iterations,
String cipher,
int keyLength,
byte[] nonce,
byte[] challenge) {
this(appId, "", kdf, iterations, cipher, keyLength, nonce, challenge);
}
public ClientChallenge(
String appId,
String user,
String kdf,
int iterations,
String cipher,
int keyLength,
byte[] nonce,
byte[] challenge) {
this.appId = appId;
this.user = user;
this.kdf = kdf;
this.iterations = iterations;
this.cipher = cipher;
Expand All @@ -63,6 +76,7 @@ public ClientChallenge(
public int encodedLength() {
return 1 + 4 + 4 +
Encoders.Strings.encodedLength(appId) +
Encoders.Strings.encodedLength(user) +
Encoders.Strings.encodedLength(kdf) +
Encoders.Strings.encodedLength(cipher) +
Encoders.ByteArrays.encodedLength(nonce) +
Expand All @@ -73,6 +87,7 @@ public int encodedLength() {
public void encode(ByteBuf buf) {
buf.writeByte(TAG_BYTE);
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, user);
Encoders.Strings.encode(buf, kdf);
buf.writeInt(iterations);
Encoders.Strings.encode(buf, cipher);
Expand All @@ -89,6 +104,7 @@ public static ClientChallenge decodeMessage(ByteBuffer buffer) {
}

return new ClientChallenge(
Encoders.Strings.decode(buf),
Encoders.Strings.decode(buf),
Encoders.Strings.decode(buf),
buf.readInt(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder sec
*/
@Override
public void doBootstrap(TransportClient client, Channel channel) {
SparkSaslClient saslClient = new SparkSaslClient(appId, secretKeyHolder, conf.saslEncryption());
SparkSaslClient saslClient = new SparkSaslClient(appId, secretKeyHolder, conf.saslEncryption(), conf);
try {
byte[] payload = saslClient.firstToken();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
// First message in the handshake, setup the necessary state.
client.setClientId(saslMessage.appId);
saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
conf.saslServerAlwaysEncrypt());
conf.saslServerAlwaysEncrypt(), conf);
}

byte[] response;
Expand All @@ -114,6 +114,7 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
// method returns. This assumes that the code ensures, through other means, that no outbound
// messages are being written to the channel while negotiation is still going on.
if (saslServer.isComplete()) {
client.setClientUser(saslServer.getUserName());
if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
logger.debug("SASL authentication successful for channel {}", client);
complete(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.util.TransportConf;

import static org.apache.spark.network.sasl.SparkSaslServer.*;


/**
* A SASL Client for Spark which simply keeps track of the state of a single SASL session, from the
* initial state to the "authenticated" state. This client initializes the protocol via a
Expand All @@ -48,12 +51,25 @@ public class SparkSaslClient implements SaslEncryptionBackend {
private final String secretKeyId;
private final SecretKeyHolder secretKeyHolder;
private final String expectedQop;
private TransportConf conf;
private SaslClient saslClient;

public SparkSaslClient(String secretKeyId, SecretKeyHolder secretKeyHolder, boolean encrypt) {
public SparkSaslClient(
String secretKeyId,
SecretKeyHolder secretKeyHolder,
boolean alwaysEncrypt) {
this(secretKeyId,secretKeyHolder,alwaysEncrypt, null);
}

public SparkSaslClient(
String secretKeyId,
SecretKeyHolder secretKeyHolder,
boolean encrypt,
TransportConf conf) {
this.secretKeyId = secretKeyId;
this.secretKeyHolder = secretKeyHolder;
this.expectedQop = encrypt ? QOP_AUTH_CONF : QOP_AUTH;
this.conf = conf;

Map<String, String> saslProps = ImmutableMap.<String, String>builder()
.put(Sasl.QOP, expectedQop)
Expand Down Expand Up @@ -131,11 +147,23 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
if (callback instanceof NameCallback) {
logger.trace("SASL client callback: setting username");
NameCallback nc = (NameCallback) callback;
nc.setName(encodeIdentifier(secretKeyHolder.getSaslUser(secretKeyId)));
if (conf != null && conf.isConnectionUsingTokens()) {
// Token Identifier is already encoded
nc.setName(secretKeyHolder.getSaslUser(secretKeyId));
} else {
nc.setName(encodeIdentifier(secretKeyHolder.getSaslUser(secretKeyId)));
}

} else if (callback instanceof PasswordCallback) {
logger.trace("SASL client callback: setting password");
PasswordCallback pc = (PasswordCallback) callback;
pc.setPassword(encodePassword(secretKeyHolder.getSecretKey(secretKeyId)));
if (conf != null && conf.isConnectionUsingTokens()) {
// Token Identifier is already encoded
pc.setPassword(secretKeyHolder.getSecretKey(secretKeyId).toCharArray());
} else {
pc.setPassword(encodePassword(secretKeyHolder.getSecretKey(secretKeyId)));

}
} else if (callback instanceof RealmCallback) {
logger.trace("SASL client callback: setting realm");
RealmCallback rc = (RealmCallback) callback;
Expand Down
Loading