Skip to content

Commit

Permalink
Use delegation token for kerberized metastore
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and findepi committed Sep 28, 2019
1 parent 8765c76 commit a873fcf
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import org.apache.thrift.transport.TTransport;

import java.util.Optional;

public interface HiveMetastoreAuthentication
{
TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost);
TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost, Optional<String> delegationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@

import com.google.common.collect.ImmutableMap;
import io.prestosql.plugin.hive.ForHiveMetastore;
import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.token.Token;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TTransport;

import javax.inject.Inject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.Sasl;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
import static org.apache.hadoop.security.SecurityUtil.getServerPrincipal;

public class KerberosHiveMetastoreAuthentication
Expand All @@ -53,7 +63,7 @@ public KerberosHiveMetastoreAuthentication(String hiveMetastoreServicePrincipal,
}

@Override
public TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost)
public TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost, Optional<String> delegationToken)
{
try {
String serverPrincipal = getServerPrincipal(hiveMetastoreServicePrincipal, hiveMetastoreHost);
Expand All @@ -65,19 +75,70 @@ public TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost
Sasl.QOP, "auth-conf,auth",
Sasl.SERVER_AUTH, "true");

TTransport saslTransport = new TSaslClientTransport(
KERBEROS.getMechanismName(),
null,
names[0],
names[1],
saslProps,
null,
rawTransport);
TTransport saslTransport;
if (delegationToken.isPresent()) {
saslTransport = new TSaslClientTransport(
TOKEN.getMechanismName(),
null,
null,
"default",
saslProps,
new SaslClientCallbackHandler(decodeDelegationToken(delegationToken.get())),
rawTransport);
}
else {
saslTransport = new TSaslClientTransport(
KERBEROS.getMechanismName(),
null,
names[0],
names[1],
saslProps,
null,
rawTransport);
}

return new TUGIAssumingTransport(saslTransport, authentication.getUserGroupInformation());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Token<DelegationTokenIdentifier> decodeDelegationToken(String tokenValue)
throws IOException
{
Token<DelegationTokenIdentifier> token = new Token<>();
token.decodeFromUrlString(tokenValue);
return token;
}

private static class SaslClientCallbackHandler
implements CallbackHandler
{
private final String username;
private final String password;

SaslClientCallbackHandler(Token<DelegationTokenIdentifier> token)
{
this.username = Base64.getEncoder().encodeToString(token.getIdentifier());
this.password = Base64.getEncoder().encodeToString(token.getPassword());
}

@Override
public void handle(Callback[] callbacks)
{
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
((NameCallback) callback).setName(username);
}
if (callback instanceof PasswordCallback) {
((PasswordCallback) callback).setPassword(password.toCharArray());
}
if (callback instanceof RealmCallback) {
RealmCallback realmCallback = (RealmCallback) callback;
realmCallback.setText(realmCallback.getDefaultText());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@

import org.apache.thrift.transport.TTransport;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;

public class NoHiveMetastoreAuthentication
implements HiveMetastoreAuthentication
{
@Override
public TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost)
public TTransport authenticate(TTransport rawTransport, String hiveMetastoreHost, Optional<String> delegationToken)
{
checkArgument(!delegationToken.isPresent(), "delegation token is not supported");
return rawTransport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import org.apache.thrift.TException;

import java.util.Optional;

public interface MetastoreLocator
{
/**
* Create a connected {@link ThriftMetastoreClient}
*/
ThriftMetastoreClient createMetastoreClient()
ThriftMetastoreClient createMetastoreClient(Optional<String> delegationToken)
throws TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
Expand Down Expand Up @@ -62,7 +63,7 @@ public StaticMetastoreLocator(List<URI> metastoreUris, String metastoreUsername,
* connection succeeds or there are no more fallback metastores.
*/
@Override
public ThriftMetastoreClient createMetastoreClient()
public ThriftMetastoreClient createMetastoreClient(Optional<String> delegationToken)
throws TException
{
List<HostAndPort> metastores = new ArrayList<>(addresses);
Expand All @@ -71,7 +72,7 @@ public ThriftMetastoreClient createMetastoreClient()
TException lastException = null;
for (HostAndPort metastore : metastores) {
try {
ThriftMetastoreClient client = clientFactory.create(metastore);
ThriftMetastoreClient client = clientFactory.create(metastore, delegationToken);
if (!isNullOrEmpty(metastoreUsername)) {
client.setUGI(metastoreUsername);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.prestosql.plugin.hive.PartitionStatistics;
import io.prestosql.plugin.hive.SchemaAlreadyExistsException;
import io.prestosql.plugin.hive.TableAlreadyExistsException;
import io.prestosql.plugin.hive.authentication.HiveAuthenticationConfig;
import io.prestosql.plugin.hive.authentication.HiveAuthenticationConfig.HiveMetastoreAuthenticationType;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.Column;
import io.prestosql.plugin.hive.metastore.HiveColumnStatistics;
Expand Down Expand Up @@ -123,6 +125,7 @@ public class ThriftHiveMetastore
private final Duration maxRetryTime;
private final int maxRetries;
private final boolean impersonationEnabled;
private final HiveMetastoreAuthenticationType authenticationType;

private final AtomicInteger chosenGetTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenTableParamAlternative = new AtomicInteger(Integer.MAX_VALUE);
Expand All @@ -131,7 +134,7 @@ public class ThriftHiveMetastore
private static final Pattern TABLE_PARAMETER_SAFE_VALUE_PATTERN = Pattern.compile("^[a-zA-Z0-9]*$");

@Inject
public ThriftHiveMetastore(MetastoreLocator metastoreLocator, ThriftHiveMetastoreConfig thriftConfig)
public ThriftHiveMetastore(MetastoreLocator metastoreLocator, ThriftHiveMetastoreConfig thriftConfig, HiveAuthenticationConfig authenticationConfig)
{
this.clientProvider = requireNonNull(metastoreLocator, "metastoreLocator is null");
this.backoffScaleFactor = thriftConfig.getBackoffScaleFactor();
Expand All @@ -140,6 +143,7 @@ public ThriftHiveMetastore(MetastoreLocator metastoreLocator, ThriftHiveMetastor
this.maxRetryTime = thriftConfig.getMaxRetryTime();
this.maxRetries = thriftConfig.getMaxRetries();
this.impersonationEnabled = thriftConfig.isImpersonationEnabled();
this.authenticationType = authenticationConfig.getHiveMetastoreAuthenticationType();
}

@Managed
Expand Down Expand Up @@ -1433,7 +1437,7 @@ private static boolean isValidExceptionalResponse(Exception exception)
private ThriftMetastoreClient createMetastoreClient()
throws TException
{
return clientProvider.createMetastoreClient();
return clientProvider.createMetastoreClient(Optional.empty());
}

private ThriftMetastoreClient createMetastoreClient(HiveIdentity identity)
Expand All @@ -1443,12 +1447,23 @@ private ThriftMetastoreClient createMetastoreClient(HiveIdentity identity)
return createMetastoreClient();
}

String username = identity.getUsername()
.orElseThrow(() -> new IllegalStateException("End-user name should exist when metastore impersonation is enabled"));
String username = identity.getUsername().orElseThrow(() -> new IllegalStateException("End-user name should exist when metastore impersonation is enabled"));
switch (authenticationType) {
case KERBEROS:
String delegationToken;
try (ThriftMetastoreClient client = createMetastoreClient()) {
delegationToken = client.getDelegationToken(username);
}
return clientProvider.createMetastoreClient(Optional.of(delegationToken));

case NONE:
ThriftMetastoreClient client = createMetastoreClient();
setMetastoreUserOrClose(client, username);
return client;

ThriftMetastoreClient client = createMetastoreClient();
setMetastoreUserOrClose(client, username);
return client;
default:
throw new IllegalStateException("Unsupported authentication type: " + authenticationType);
}
}

private static void setMetastoreUserOrClose(ThriftMetastoreClient client, String username)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,11 @@ public void setUGI(String userName)
{
client.set_ugi(userName, new ArrayList<>());
}

@Override
public String getDelegationToken(String userName)
throws TException
{
return client.get_delegation_token(userName, userName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,7 @@ List<RolePrincipalGrant> listRoleGrants(String name, PrincipalType principalType

void setUGI(String userName)
throws TException;

String getDelegationToken(String userName)
throws TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public ThriftMetastoreClientFactory(ThriftHiveMetastoreConfig config, HiveMetast
this(Optional.empty(), Optional.ofNullable(config.getSocksProxy()), config.getMetastoreTimeout(), metastoreAuthentication);
}

public ThriftMetastoreClient create(HostAndPort address)
public ThriftMetastoreClient create(HostAndPort address, Optional<String> delegationToken)
throws TTransportException
{
return new ThriftHiveMetastoreClient(Transport.create(address, sslContext, socksProxy, timeoutMillis, metastoreAuthentication));
return new ThriftHiveMetastoreClient(Transport.create(address, sslContext, socksProxy, timeoutMillis, metastoreAuthentication, delegationToken));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ public static TTransport create(
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
int timeoutMillis,
HiveMetastoreAuthentication authentication)
HiveMetastoreAuthentication authentication,
Optional<String> delegationToken)
throws TTransportException
{
requireNonNull(address, "address is null");
try {
TTransport rawTransport = createRaw(address, sslContext, socksProxy, timeoutMillis);
TTransport authenticatedTransport = authentication.authenticate(rawTransport, address.getHost());
TTransport authenticatedTransport = authentication.authenticate(rawTransport, address.getHost(), delegationToken);
if (!authenticatedTransport.isOpen()) {
authenticatedTransport.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.prestosql.GroupByHashPageIndexerFactory;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.LocationService.WriteInfo;
import io.prestosql.plugin.hive.authentication.HiveAuthenticationConfig;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.Column;
import io.prestosql.plugin.hive.metastore.HiveColumnStatistics;
Expand Down Expand Up @@ -702,7 +703,7 @@ protected final void setup(String host, int port, String databaseName, String ti
MetastoreLocator metastoreLocator = new TestingMetastoreLocator(proxy, HostAndPort.fromParts(host, port));

HiveMetastore metastore = new CachingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig())),
new BridgingHiveMetastore(new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig(), new HiveAuthenticationConfig())),
executor,
Duration.valueOf("1m"),
Duration.valueOf("15s"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.prestosql.plugin.hive.AbstractTestHive.HiveTransaction;
import io.prestosql.plugin.hive.AbstractTestHive.Transaction;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.authentication.HiveAuthenticationConfig;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.authentication.NoHdfsAuthentication;
import io.prestosql.plugin.hive.metastore.Database;
Expand Down Expand Up @@ -162,7 +163,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec

hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, new HdfsConfig(), new NoHdfsAuthentication());
metastoreClient = new TestingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig())),
new BridgingHiveMetastore(new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig(), new HiveAuthenticationConfig())),
executor,
getBasePath(),
hdfsEnvironment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.units.Duration;
import io.prestosql.plugin.hive.authentication.HiveAuthenticationConfig;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.Partition;
import io.prestosql.plugin.hive.metastore.thrift.BridgingHiveMetastore;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void setUp()
private ThriftHiveMetastore createThriftHiveMetastore()
{
MetastoreLocator metastoreLocator = new MockMetastoreLocator(mockClient);
return new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig());
return new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig(), new HiveAuthenticationConfig());
}

@Test
Expand Down Expand Up @@ -304,7 +305,7 @@ private MockMetastoreLocator(ThriftMetastoreClient client)
}

@Override
public ThriftMetastoreClient createMetastoreClient()
public ThriftMetastoreClient createMetastoreClient(Optional<String> delegationToken)
{
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,10 @@ public void setUGI(String userName)
{
// No-op
}

@Override
public String getDelegationToken(String userName)
{
throw new UnsupportedOperationException();
}
}
Loading

0 comments on commit a873fcf

Please sign in to comment.