();
+ private static final Properties DEFAULT_CONFIG = new Properties();
+
+ static {
+ PROPERTIES.add(ORG_NAME);
+ PROPERTIES.add(ORG_DOMAIN);
+ PROPERTIES.add(KDC_BIND_ADDRESS);
+ PROPERTIES.add(KDC_BIND_ADDRESS);
+ PROPERTIES.add(KDC_PORT);
+ PROPERTIES.add(INSTANCE);
+ PROPERTIES.add(TRANSPORT);
+ PROPERTIES.add(MAX_TICKET_LIFETIME);
+ PROPERTIES.add(MAX_RENEWABLE_LIFETIME);
+
+ DEFAULT_CONFIG.setProperty(KDC_BIND_ADDRESS, "localhost");
+ DEFAULT_CONFIG.setProperty(KDC_PORT, "0");
+ DEFAULT_CONFIG.setProperty(INSTANCE, "DefaultKrbServer");
+ DEFAULT_CONFIG.setProperty(ORG_NAME, "EXAMPLE");
+ DEFAULT_CONFIG.setProperty(ORG_DOMAIN, "COM");
+ DEFAULT_CONFIG.setProperty(TRANSPORT, "TCP");
+ DEFAULT_CONFIG.setProperty(MAX_TICKET_LIFETIME, "86400000");
+ DEFAULT_CONFIG.setProperty(MAX_RENEWABLE_LIFETIME, "604800000");
+ DEFAULT_CONFIG.setProperty(DEBUG, "false");
+ }
+
+ /**
+ * Convenience method that returns MiniKdc default configuration.
+ *
+ * The returned configuration is a copy, it can be customized before using
+ * it to create a MiniKdc.
+ * @return a MiniKdc default configuration.
+ */
+ public static Properties createConf() {
+ return (Properties) DEFAULT_CONFIG.clone();
+ }
+
+ private Properties conf;
+ private SimpleKdcServer simpleKdc;
+ private int port;
+ private String realm;
+ private File workDir;
+ private File krb5conf;
+ private String transport;
+ private boolean krb5Debug;
+
+ public void setTransport(String transport) {
+ this.transport = transport;
+ }
+
+ /**
+ * Creates a MiniKdc.
+ *
+ * @param conf MiniKdc configuration.
+ * @param workDir working directory, it should be the build directory. Under
+ * this directory an ApacheDS working directory will be created, this
+ * directory will be deleted when the MiniKdc stops.
+ * @throws Exception thrown if the MiniKdc could not be created.
+ */
+ public MiniKdc(Properties conf, File workDir) throws Exception {
+ if (!conf.keySet().containsAll(PROPERTIES)) {
+ Set missingProperties = new HashSet(PROPERTIES);
+ missingProperties.removeAll(conf.keySet());
+ throw new IllegalArgumentException("Missing configuration properties: "
+ + missingProperties);
+ }
+ this.workDir = new File(workDir, Long.toString(System.currentTimeMillis()));
+ if (!this.workDir.exists()
+ && !this.workDir.mkdirs()) {
+ throw new RuntimeException("Cannot create directory " + this.workDir);
+ }
+ log.info("Configuration:");
+ log.info("---------------------------------------------------------------");
+ for (Map.Entry, ?> entry : conf.entrySet()) {
+ log.info(" {}: {}", entry.getKey(), entry.getValue());
+ }
+ log.info("---------------------------------------------------------------");
+ this.conf = conf;
+ port = Integer.parseInt(conf.getProperty(KDC_PORT));
+ String orgName = conf.getProperty(ORG_NAME);
+ String orgDomain = conf.getProperty(ORG_DOMAIN);
+ realm = orgName.toUpperCase(Locale.ENGLISH) + "."
+ + orgDomain.toUpperCase(Locale.ENGLISH);
+ }
+
+ /**
+ * Returns the port of the MiniKdc.
+ *
+ * @return the port of the MiniKdc.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Returns the host of the MiniKdc.
+ *
+ * @return the host of the MiniKdc.
+ */
+ public String getHost() {
+ return conf.getProperty(KDC_BIND_ADDRESS);
+ }
+
+ /**
+ * Returns the realm of the MiniKdc.
+ *
+ * @return the realm of the MiniKdc.
+ */
+ public String getRealm() {
+ return realm;
+ }
+
+ public File getKrb5conf() {
+ krb5conf = new File(System.getProperty(JAVA_SECURITY_KRB5_CONF));
+ return krb5conf;
+ }
+
+ /**
+ * Starts the MiniKdc.
+ *
+ * @throws Exception thrown if the MiniKdc could not be started.
+ */
+ public synchronized void start() throws Exception {
+ if (simpleKdc != null) {
+ throw new RuntimeException("Already started");
+ }
+ simpleKdc = new SimpleKdcServer();
+ prepareKdcServer();
+ simpleKdc.init();
+ resetDefaultRealm();
+ simpleKdc.start();
+ log.info("MiniKdc stated.");
+ }
+
+ private void resetDefaultRealm() throws IOException {
+ InputStream templateResource = new FileInputStream(
+ getKrb5conf().getAbsolutePath());
+ String content = IOUtil.readInput(templateResource);
+ content = content.replaceAll("default_realm = .*\n",
+ "default_realm = " + getRealm() + "\n");
+ IOUtil.writeFile(content, getKrb5conf());
+ }
+
+ private void prepareKdcServer() throws Exception {
+ // transport
+ simpleKdc.setWorkDir(workDir);
+ simpleKdc.setKdcHost(getHost());
+ simpleKdc.setKdcRealm(realm);
+ if (transport == null) {
+ transport = conf.getProperty(TRANSPORT);
+ }
+ if (port == 0) {
+ port = NetworkUtil.getServerPort();
+ }
+ if (transport != null) {
+ if (transport.trim().equals("TCP")) {
+ simpleKdc.setKdcTcpPort(port);
+ simpleKdc.setAllowUdp(false);
+ } else if (transport.trim().equals("UDP")) {
+ simpleKdc.setKdcUdpPort(port);
+ simpleKdc.setAllowTcp(false);
+ } else {
+ throw new IllegalArgumentException("Invalid transport: " + transport);
+ }
+ } else {
+ throw new IllegalArgumentException("Need to set transport!");
+ }
+ simpleKdc.getKdcConfig().setString(KdcConfigKey.KDC_SERVICE_NAME,
+ conf.getProperty(INSTANCE));
+ if (conf.getProperty(DEBUG) != null) {
+ krb5Debug = getAndSet(SUN_SECURITY_KRB5_DEBUG, conf.getProperty(DEBUG));
+ }
+ }
+
+ /**
+ * Stops the MiniKdc.
+ */
+ public synchronized void stop() {
+ if (simpleKdc != null) {
+ try {
+ simpleKdc.stop();
+ } catch (KrbException e) {
+ e.printStackTrace();
+ } finally {
+ if (conf.getProperty(DEBUG) != null) {
+ System.setProperty(SUN_SECURITY_KRB5_DEBUG,
+ Boolean.toString(krb5Debug));
+ }
+ }
+ }
+ delete(workDir);
+ try {
+ // Will be fixed in next Kerby version.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ log.info("MiniKdc stopped.");
+ }
+
+ private void delete(File f) {
+ if (f.isFile()) {
+ if (!f.delete()) {
+ log.warn("WARNING: cannot delete file " + f.getAbsolutePath());
+ }
+ } else {
+ for (File c: f.listFiles()) {
+ delete(c);
+ }
+ if (!f.delete()) {
+ log.warn("WARNING: cannot delete directory " + f.getAbsolutePath());
+ }
+ }
+ }
+
+ /**
+ * Creates a principal in the KDC with the specified user and password.
+ *
+ * @param principal principal name, do not include the domain.
+ * @param password password.
+ * @throws Exception thrown if the principal could not be created.
+ */
+ public synchronized void createPrincipal(String principal, String password)
+ throws Exception {
+ simpleKdc.createPrincipal(principal, password);
+ }
+
+ /**
+ * Creates multiple principals in the KDC and adds them to a keytab file.
+ *
+ * @param keytabFile keytab file to add the created principals.
+ * @param principals principals to add to the KDC, do not include the domain.
+ * @throws Exception thrown if the principals or the keytab file could not be
+ * created.
+ */
+ public synchronized void createPrincipal(File keytabFile,
+ String ... principals)
+ throws Exception {
+ simpleKdc.createPrincipals(principals);
+ if (keytabFile.exists() && !keytabFile.delete()) {
+ log.error("Failed to delete keytab file: " + keytabFile);
+ }
+ for (String principal : principals) {
+ simpleKdc.getKadmin().exportKeytab(keytabFile, principal);
+ }
+ }
+
+ /**
+ * Set the System property; return the old value for caching.
+ *
+ * @param sysprop property
+ * @param debug true or false
+ * @return the previous value
+ */
+ private boolean getAndSet(String sysprop, String debug) {
+ boolean old = Boolean.getBoolean(sysprop);
+ System.setProperty(sysprop, debug);
+ return old;
+ }
+}
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
new file mode 100644
index 0000000000000..61d2b04f6ec1e
--- /dev/null
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.login.Configuration;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class SaslAuthenticateTest extends ProducerConsumerBase {
+ public static File kdcDir;
+ public static File kerberosWorkDir;
+
+ private static MiniKdc kdc;
+ private static Properties properties;
+
+ private static String localHostname = "localhost";
+ private static Authentication authSasl;
+
+ @BeforeClass
+ public static void startMiniKdc() throws Exception {
+ kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
+ kerberosWorkDir = Files.createTempDirectory("test-kerberos-work-dir").toFile();
+
+ properties = MiniKdc.createConf();
+ kdc = new MiniKdc(properties, kdcDir);
+ kdc.start();
+
+ String principalServerNoRealm = "broker/" + localHostname;
+ String principalServer = "broker/" + localHostname + "@" + kdc.getRealm();
+ log.info("principalServer: " + principalServer);
+ String principalClientNoRealm = "client/" + localHostname;
+ String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
+ log.info("principalClient: " + principalClient);
+
+ File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
+ kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+ File keytabServer = new File(kerberosWorkDir, "pulsarbroker.keytab");
+ kdc.createPrincipal(keytabServer, principalServerNoRealm);
+
+ File jaasFile = new File(kerberosWorkDir, "jaas.properties");
+ try (FileWriter writer = new FileWriter(jaasFile)) {
+ writer.write("\n"
+ + "PulsarBroker {\n"
+ + " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" + keytabServer.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+ + " principal=\"" + principalServer + "\";\n"
+ + "};\n"
+ + "\n"
+ + "\n"
+ + "\n"
+ + "PulsarClient {\n"
+ + " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n"
+ + " principal=\"" + principalClient + "\";\n"
+ + "};\n"
+ );
+ }
+
+ File krb5file = new File(kerberosWorkDir, "krb5.properties");
+ try (FileWriter writer = new FileWriter(krb5file)) {
+ String conf = "[libdefaults]\n"
+ + " default_realm = " + kdc.getRealm() + "\n"
+ + " udp_preference_limit = 1\n" // force use TCP
+ + "\n"
+ + "\n"
+ + "[realms]\n"
+ + " " + kdc.getRealm() + " = {\n"
+ + " kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+ + " }";
+ writer.write(conf);
+ log.info("krb5.properties:\n" + conf);
+ }
+
+ System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
+ System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
+ Configuration.getConfiguration().refresh();
+
+ // Client config
+ Map clientSaslConfig = Maps.newHashMap();
+ clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+ log.info("set client jaas section name: PulsarClient");
+ authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
+ log.info("created AuthenticationSasl");
+ }
+
+ @AfterClass
+ public static void stopMiniKdc() {
+ System.clearProperty("java.security.auth.login.config");
+ System.clearProperty("java.security.krb5.properties");
+ if (kdc != null) {
+ kdc.stop();
+ }
+ FileUtils.deleteQuietly(kdcDir);
+ FileUtils.deleteQuietly(kerberosWorkDir);
+ Assert.assertFalse(kdcDir.exists());
+ Assert.assertFalse(kerberosWorkDir.exists());
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ log.info("-- {} --, start at host: {}", methodName, localHostname);
+ isTcpLookup = true;
+ conf.setAdvertisedAddress(localHostname);
+ conf.setAuthenticationEnabled(true);
+ conf.setSaslAuthentication(true);
+ conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+ conf.setSaslJaasBrokerSectionName("PulsarBroker");
+ Set providers = new HashSet<>();
+ providers.add(AuthenticationProviderSasl.class.getName());
+ conf.setAuthenticationProviders(providers);
+ conf.setClusterName("test");
+
+ super.init();
+
+ lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);
+ pulsarClient = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .authentication(authSasl).build();
+
+ log.info("-- {} --, end.", methodName);
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ // Test could verify with kerberos configured.
+ @Test
+ public void testProducerAndConsumerPassed() throws Exception {
+ log.info("-- {} -- start", methodName);
+
+ Consumer consumer = pulsarClient.newConsumer()
+ .topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionName("my-subscriber-name")
+ .subscribe();
+
+ ProducerBuilder producerBuilder = pulsarClient.newProducer()
+ .topic("persistent://my-property/my-ns/my-topic")
+ .enableBatching(false);
+
+ Producer producer = producerBuilder.create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ log.info("Produced message: [{}]", message);
+ }
+
+ Message msg = null;
+ Set messageSet = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+
+ log.info("-- {} -- end", methodName);
+ }
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0fbe155190a09..da2b098f05dc4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -37,6 +37,7 @@
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.sasl.SaslConstants;
/**
* Pulsar service configuration object.
@@ -76,6 +77,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
@Category
+ private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
+ @Category
private static final String CATEGORY_HTTP = "HTTP";
/***** --- pulsar configuration --- ****/
@@ -590,6 +593,35 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String anonymousUserRole = null;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "Whether Use SASL Authentication or not"
+ )
+ // TODO: isSaslAuthentication used to bypass web resource check.
+ // will remove it after implementation the support.
+ // github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+ private boolean isSaslAuthentication = false;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
+ + " Default value is: \".*pulsar.*\", so only clients whose id contains 'pulsar' are allowed to connect."
+ )
+ private String saslJaasClientAllowedIds = SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "Service Principal, for login context name. Default value is \"Broker\"."
+ )
+ private String saslJaasBrokerSectionName = SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "kerberos kinit command."
+ )
+ private String kinitCommand = "/usr/bin/kinit";
+
/**** --- BookKeeper Client --- ****/
@FieldContext(
category = CATEGORY_STORAGE_BK,
@@ -1202,4 +1234,4 @@ public Optional getWebServicePort() {
public Optional getWebServicePortTls() {
return Optional.ofNullable(webServicePortTls);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
index fcc6dda28ee5c..b72b99beaae9f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.broker.authentication;
-import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.Certificate;
+import javax.naming.AuthenticationException;
import org.apache.pulsar.common.api.AuthData;
/**
@@ -103,8 +103,8 @@ default String getCommandData() {
* Evaluate and challenge the data that passed in, and return processed data back.
* It is used for mutual authentication like SASL.
*/
- default AuthData authenticate(AuthData data) throws IOException {
- throw new UnsupportedOperationException();
+ default AuthData authenticate(AuthData data) throws AuthenticationException {
+ throw new AuthenticationException("Not supported");
}
/*
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
index 755fe8643857e..f9a2d03772f0a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
@@ -69,7 +69,7 @@ default String authenticate(AuthenticationDataSource authData) throws Authentica
default AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession)
- throws AuthenticationException{
+ throws AuthenticationException {
return new OneStageAuthenticationState(authData, remoteAddress, sslSession, this);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bcd739f8006cb..abed070d9bf99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,6 +100,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
@@ -220,14 +221,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
/*
- * If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce
+ * If authentication and authorization is enabled(and not sasl) and if the authRole is one of proxyRoles we want to enforce
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal
*/
+ //TODO: for sasl proxy.
+ // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
private boolean invalidOriginalPrincipal(String originalPrincipal) {
- return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
- && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal)));
+ return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()
+ && !isSaslAuthenticationMethod()
+ && proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal)));
}
// ////
@@ -1498,6 +1502,10 @@ private TopicName validateTopicName(String topic, long requestId, GeneratedMessa
}
}
+ private boolean isSaslAuthenticationMethod(){
+ return authMethod.equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME);
+ }
+
private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index dc631d6b336ac..824be7a2d4a39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -151,12 +151,12 @@ private static void validateOriginalPrincipal(Set proxyRoles, String aut
// Request has come from a proxy
if (StringUtils.isBlank(originalPrincipal)) {
log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
- throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
+ throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
}
if (proxyRoles.contains(originalPrincipal)) {
log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
- throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
- }
+ throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+ }
}
}
@@ -167,7 +167,10 @@ private static void validateOriginalPrincipal(Set proxyRoles, String aut
* if not authorized
*/
protected void validateSuperUserAccess() {
- if (config().isAuthenticationEnabled()) {
+ // TODO: isSaslAuthentication used to bypass web resource check.
+ // will remove it after implementation the support.
+ // github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+ if (config().isAuthenticationEnabled() && !config().isSaslAuthentication()) {
String appId = clientAppId();
if(log.isDebugEnabled()) {
log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),
@@ -243,7 +246,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
}
- if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
+ if (pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication() && pulsar.getConfiguration().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId)) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 3ae55bc6550a9..ef5c3c6aa773e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -133,7 +133,10 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
});
}
- if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
+ // TODO: isSaslAuthentication used to bypass web resource check.
+ // will remove it after implementation the support.
+ // github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+ if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(
pulsar.getBrokerService().getAuthenticationService()));
context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index ecf3d3f5b3929..9a0df5d0d3868 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -18,16 +18,17 @@
*/
package org.apache.pulsar.client.api;
-import java.io.IOException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.Serializable;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.util.Map;
import java.util.Set;
+
import javax.naming.AuthenticationException;
-import org.apache.pulsar.common.api.AuthData;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import org.apache.pulsar.common.api.AuthData;
/**
* Interface for accessing data which are used in variety of authentication schemes on client side
@@ -119,7 +120,7 @@ default String getCommandData() {
*
* Mainly used for mutual authentication like sasl.
*/
- default AuthData authenticate(AuthData data) throws IOException, AuthenticationException {
+ default AuthData authenticate(AuthData data) throws AuthenticationException {
byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes(UTF_8);
return AuthData.of(bytes);
}
diff --git a/pulsar-client-auth-sasl/pom.xml b/pulsar-client-auth-sasl/pom.xml
new file mode 100644
index 0000000000000..27288aceaa90b
--- /dev/null
+++ b/pulsar-client-auth-sasl/pom.xml
@@ -0,0 +1,45 @@
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar
+ 2.4.0-SNAPSHOT
+ ..
+
+
+ pulsar-client-auth-sasl
+ jar
+ SASL authentication plugin for java client
+
+
+
+
+ ${project.groupId}
+ pulsar-client-original
+ ${project.parent.version}
+
+
+
+
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
new file mode 100644
index 0000000000000..7b98be38e6750
--- /dev/null
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -0,0 +1,138 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+import org.apache.pulsar.client.impl.auth.PulsarSaslClient.ClientCallbackHandler;
+import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
+import org.apache.pulsar.common.sasl.SaslConstants;
+
+/**
+ * Authentication provider for SASL based authentication.
+ *
+ * SASL need config files through JVM parameter:
+ * a jaas.conf, which is set by `-Djava.security.auth.login.config=/dir/jaas.conf`
+ * for Kerberos a krb5.conf, which is set by `-Djava.security.krb5.conf=/dir/krb5.conf`
+ */
+@Slf4j
+public class AuthenticationSasl implements Authentication, EncodedAuthenticationParameterSupport {
+ private static final long serialVersionUID = 1L;
+ // this is a static object that shares amongst client.
+ static private JAASCredentialsContainer jaasCredentialsContainer;
+ static private volatile boolean initializedJAAS = false;
+
+ private Map configuration;
+ private String loginContextName;
+
+ public AuthenticationSasl() {
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return SaslConstants.AUTH_METHOD_NAME;
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData(String brokerHostName) throws PulsarClientException {
+ // reuse this to return a DataProvider which contains a SASL client
+ try {
+ PulsarSaslClient saslClient = new PulsarSaslClient(brokerHostName, jaasCredentialsContainer.getSubject());
+ return new SaslAuthenticationDataProvider(saslClient);
+ } catch (Throwable t) {
+ log.error("Failed create sasl client: {}", t);
+ throw new PulsarClientException(t);
+ }
+ }
+
+ @Override
+ public void configure(String encodedAuthParamString) {
+ if (isBlank(encodedAuthParamString)) {
+ log.info("authParams for SASL is be empty, will use default JAAS client section name: {}",
+ SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+ }
+
+ try {
+ setAuthParams(AuthenticationUtil.configureFromJsonString(encodedAuthParamString));
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to parse SASL authParams", e);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public void configure(Map authParams) {
+ try {
+ setAuthParams(authParams);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to parse SASL authParams", e);
+ }
+ }
+
+ // use passed in parameter to config ange get jaasCredentialsContainer.
+ private void setAuthParams(Map authParams) throws PulsarClientException {
+ this.configuration = authParams;
+
+ // read section from config files of kerberos
+ this.loginContextName = authParams
+ .getOrDefault(SaslConstants.JAAS_CLIENT_SECTION_NAME, SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+
+ // init the static jaasCredentialsContainer that shares amongst client.
+ if (!initializedJAAS) {
+ synchronized (this) {
+ if (jaasCredentialsContainer == null) {
+ log.info("JAAS loginContext is: {}." , loginContextName);
+ try {
+ jaasCredentialsContainer = new JAASCredentialsContainer(
+ loginContextName,
+ new ClientCallbackHandler(),
+ configuration);
+ initializedJAAS = true;
+ } catch (LoginException e) {
+ log.error("JAAS login in client failed: {}" , e);
+ throw new PulsarClientException(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+}
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
new file mode 100644
index 0000000000000..23399a1aee98f
--- /dev/null
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.naming.AuthenticationException;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.sasl.KerberosName;
+import org.apache.pulsar.common.sasl.SaslConstants;
+
+/**
+ * A SASL Client object.
+ * This is added for support Kerberos authentication.
+ */
+@Slf4j
+public class PulsarSaslClient {
+ private final SaslClient saslClient;
+ private final Subject clientSubject;
+
+ public PulsarSaslClient(String serverHostname, Subject subject) throws SaslException {
+ checkArgument(subject != null, "Cannot create SASL client with NULL JAAS subject");
+ checkArgument(!Strings.isNullOrEmpty(serverHostname), "Cannot create SASL client with NUll server name");
+
+ String serverPrincipal = SaslConstants.SASL_PULSAR_PROTOCOL + "/" + serverHostname;
+ this.clientSubject = subject;
+ if (clientSubject.getPrincipals().isEmpty()) {
+ throw new SaslException("Cannot create SASL client with empty JAAS subject principal");
+ }
+ // GSSAPI/Kerberos
+ final Object[] principals = clientSubject.getPrincipals().toArray();
+ final Principal clientPrincipal = (Principal) principals[0];
+
+ final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
+ KerberosName serviceKerberosName = new KerberosName(serverPrincipal + "@" + clientKerberosName.getRealm());
+ final String serviceName = serviceKerberosName.getServiceName();
+ final String serviceHostname = serviceKerberosName.getHostName();
+ final String clientPrincipalName = clientKerberosName.toString();
+ log.info("Using JAAS/SASL/GSSAPI auth to connect to server Principal {},",
+ serverPrincipal);
+
+ try {
+ this.saslClient = Subject.doAs(clientSubject, new PrivilegedExceptionAction() {
+ @Override
+ public SaslClient run() throws SaslException {
+ String[] mechs = {"GSSAPI"};
+ return Sasl.createSaslClient(mechs, clientPrincipalName, serviceName, serviceHostname, null,
+ new ClientCallbackHandler());
+ }
+ });
+ } catch (PrivilegedActionException err) {
+ log.error("GSSAPI client error", err.getCause());
+ throw new SaslException("error while booting GSSAPI client", err.getCause());
+ }
+
+ if (saslClient == null) {
+ throw new SaslException("Cannot create JVM SASL Client");
+ }
+
+ }
+
+ public AuthData evaluateChallenge(final AuthData saslToken) throws AuthenticationException {
+ if (saslToken == null) {
+ throw new AuthenticationException("saslToken is null");
+ }
+ try {
+ if (clientSubject != null) {
+ final byte[] retval = Subject.doAs(clientSubject, new PrivilegedExceptionAction() {
+ @Override
+ public byte[] run() throws SaslException {
+ return saslClient.evaluateChallenge(saslToken.getBytes());
+ }
+ });
+ return AuthData.of(retval);
+
+ } else {
+ return AuthData.of(saslClient.evaluateChallenge(saslToken.getBytes()));
+ }
+ } catch (Exception e) {
+ log.error("SASL error", e.getCause());
+ throw new AuthenticationException("SASL/JAAS error" + e.getCause());
+ }
+ }
+
+ public boolean hasInitialResponse() {
+ return saslClient.hasInitialResponse();
+ }
+
+ static class ClientCallbackHandler implements CallbackHandler {
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ handleAuthorizeCallback((AuthorizeCallback) callback);
+ } else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL GSSAPI Client Callback.");
+ }
+ }
+ }
+
+ private void handleAuthorizeCallback(AuthorizeCallback ac) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ log.info("Successfully authenticated. authenticationID: {}; authorizationID: {}.",
+ authid, authzid);
+ }
+ }
+
+
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
+
+}
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
new file mode 100644
index 0000000000000..6f38bd79478b0
--- /dev/null
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import java.util.Arrays;
+
+import javax.naming.AuthenticationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.api.AuthData;
+
+@Slf4j
+public class SaslAuthenticationDataProvider implements AuthenticationDataProvider {
+ private static final long serialVersionUID = 1L;
+
+ private PulsarSaslClient pulsarSaslClient;
+
+ public SaslAuthenticationDataProvider(PulsarSaslClient pulsarSaslClient) {
+ this.pulsarSaslClient = pulsarSaslClient;
+ }
+
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ // create token that evaluated by client, and will send to server.
+ @Override
+ public AuthData authenticate(AuthData commandData) throws AuthenticationException {
+ // init
+ if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) {
+ if (pulsarSaslClient.hasInitialResponse()) {
+ return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0]));
+ }
+ return AuthData.of(new byte[0]);
+ }
+
+ return pulsarSaslClient.evaluateChallenge(commandData);
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
new file mode 100644
index 0000000000000..345bae8bf1529
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * JAAS Credentials Container.
+ * This is added for support Kerberos authentication.
+ */
+@Slf4j
+@Getter
+public class JAASCredentialsContainer implements Closeable {
+ private Subject subject;
+ private String principal;
+ private boolean isKrbTicket;
+ private boolean isUsingTicketCache;
+ private TGTRefreshThread ticketRefreshThread;
+
+ public CallbackHandler callbackHandler;
+ private String loginContextName;
+ private LoginContext loginContext;
+ private Map configuration;
+
+ public JAASCredentialsContainer(String loginContextName,
+ CallbackHandler callbackHandler,
+ Map configuration)
+ throws LoginException {
+ this.configuration = configuration;
+ this.callbackHandler = callbackHandler;
+ this.loginContextName = loginContextName;
+ AppConfigurationEntry[] entries = Configuration.getConfiguration()
+ .getAppConfigurationEntry(loginContextName);
+ if (entries == null) {
+ final String errorMessage = "loginContext name (JAAS file section header) was null. " +
+ "Please check your java.security.login.auth.config (=" +
+ System.getProperty("java.security.login.auth.config") +
+ ") for section header: " + this.loginContextName;
+ log.error("No JAAS Configuration section header found for Client: {}", errorMessage);
+ throw new LoginException(errorMessage);
+ }
+ LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
+ loginContext.login();
+ log.info("successfully logged in.");
+
+ this.loginContext = loginContext;
+ this.subject = loginContext.getSubject();
+ this.isKrbTicket = !this.subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+ if (isKrbTicket) {
+ this.isUsingTicketCache = SaslConstants.isUsingTicketCache(loginContextName);
+ this.principal = SaslConstants.getPrincipal(loginContextName);
+ this.ticketRefreshThread = new TGTRefreshThread(this);
+ } else {
+ throw new LoginException("Kerberos authentication without KerberosTicket provided!");
+ }
+
+ ticketRefreshThread.start();
+ }
+
+ void setLoginContext(LoginContext login) {
+ this.loginContext = login;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (ticketRefreshThread != null) {
+ ticketRefreshThread.interrupt();
+ try {
+ ticketRefreshThread.join(10000);
+ } catch (InterruptedException exit) {
+ Thread.currentThread().interrupt();
+ if (log.isDebugEnabled()) {
+ log.debug("interrupted while waiting for TGT refresh thread to stop", exit);
+ }
+ }
+ }
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
new file mode 100644
index 0000000000000..5f36fdea4100f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class implements parsing and handling of Kerberos principal names. In
+ * particular, it splits them apart and translates them down into local
+ * operating system names.
+ *
+ * Copied from Apache ZooKeeper KerberosName.
+ */
+public class KerberosName {
+ /** The first component of the name */
+ private final String serviceName;
+ /** The second component of the name. It may be null. */
+ private final String hostName;
+ /** The realm of the name. */
+ private final String realm;
+
+ /**
+ * A pattern that matches a Kerberos name with at most 2 components.
+ */
+ private static final Pattern nameParser =
+ Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
+
+ /**
+ * A pattern that matches a string with out '$' and then a single
+ * parameter with $n.
+ */
+ private static Pattern parameterPattern =
+ Pattern.compile("([^$]*)(\\$(\\d*))?");
+
+ /**
+ * A pattern for parsing a auth_to_local rule.
+ */
+ private static final Pattern ruleParser =
+ Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
+ "(s/([^/]*)/([^/]*)/(g)?)?))");
+
+ /**
+ * A pattern that recognizes simple/non-simple names.
+ */
+ private static final Pattern nonSimplePattern = Pattern.compile("[/@]");
+
+ /**
+ * The list of translation rules.
+ */
+ private static List rules;
+
+ private static String defaultRealm;
+
+ public static String getDefaultRealm2()
+ throws ClassNotFoundException, NoSuchMethodException,
+ IllegalArgumentException, IllegalAccessException,
+ InvocationTargetException {
+ Object kerbConf;
+ Class> classRef;
+ Method getInstanceMethod;
+ Method getDefaultRealmMethod;
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ classRef = Class.forName("com.ibm.security.krb5.internal.Config");
+ } else {
+ classRef = Class.forName("sun.security.krb5.Config");
+ }
+ getInstanceMethod = classRef.getMethod("getInstance", new Class>[0]);
+ kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
+ getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
+ new Class>[0]);
+ return (String)getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
+ }
+
+ static {
+ try {
+ defaultRealm = getDefaultRealm2();
+ } catch (Exception ke) {
+ if ((System.getProperty("zookeeper.requireKerberosConfig") != null) &&
+ (System.getProperty("zookeeper.requireKerberosConfig").equals("true"))) {
+ throw new IllegalArgumentException("Can't get Kerberos configuration",ke);
+ }
+ else
+ defaultRealm="";
+ }
+ try {
+ // setConfiguration() will work even if the above try() fails due
+ // to a missing Kerberos configuration (unless zookeeper.requireKerberosConfig
+ // is set to true, which would not allow execution to reach here due to the
+ // throwing of an IllegalArgumentException above).
+ setConfiguration();
+ }
+ catch (IOException e) {
+ throw new IllegalArgumentException("Could not configure Kerberos principal name mapping.");
+ }
+ }
+
+ /**
+ * Create a name from the full Kerberos principal name.
+ * @param name
+ */
+ public KerberosName(String name) {
+ Matcher match = nameParser.matcher(name);
+ if (!match.matches()) {
+ if (name.contains("@")) {
+ throw new IllegalArgumentException("Malformed Kerberos name: " + name);
+ } else {
+ serviceName = name;
+ hostName = null;
+ realm = null;
+ }
+ } else {
+ serviceName = match.group(1);
+ hostName = match.group(3);
+ realm = match.group(4);
+ }
+ }
+
+ /**
+ * Get the configured default realm.
+ * @return the default realm from the krb5.conf
+ */
+ public String getDefaultRealm() {
+ return defaultRealm;
+ }
+
+ /**
+ * Put the name back together from the parts.
+ */
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append(serviceName);
+ if (hostName != null) {
+ result.append('/');
+ result.append(hostName);
+ }
+ if (realm != null) {
+ result.append('@');
+ result.append(realm);
+ }
+ return result.toString();
+ }
+
+ /**
+ * Get the first component of the name.
+ * @return the first section of the Kerberos principal name
+ */
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ /**
+ * Get the second component of the name.
+ * @return the second section of the Kerberos principal name, and may be null
+ */
+ public String getHostName() {
+ return hostName;
+ }
+
+ /**
+ * Get the realm of the name.
+ * @return the realm of the name, may be null
+ */
+ public String getRealm() {
+ return realm;
+ }
+
+ /**
+ * An encoding of a rule for translating kerberos names.
+ */
+ private static class Rule {
+ private final boolean isDefault;
+ private final int numOfComponents;
+ private final String format;
+ private final Pattern match;
+ private final Pattern fromPattern;
+ private final String toPattern;
+ private final boolean repeat;
+
+ Rule() {
+ isDefault = true;
+ numOfComponents = 0;
+ format = null;
+ match = null;
+ fromPattern = null;
+ toPattern = null;
+ repeat = false;
+ }
+
+ Rule(int numOfComponents, String format, String match, String fromPattern,
+ String toPattern, boolean repeat) {
+ isDefault = false;
+ this.numOfComponents = numOfComponents;
+ this.format = format;
+ this.match = match == null ? null : Pattern.compile(match);
+ this.fromPattern =
+ fromPattern == null ? null : Pattern.compile(fromPattern);
+ this.toPattern = toPattern;
+ this.repeat = repeat;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ if (isDefault) {
+ buf.append("DEFAULT");
+ } else {
+ buf.append("RULE:[");
+ buf.append(numOfComponents);
+ buf.append(':');
+ buf.append(format);
+ buf.append(']');
+ if (match != null) {
+ buf.append('(');
+ buf.append(match);
+ buf.append(')');
+ }
+ if (fromPattern != null) {
+ buf.append("s/");
+ buf.append(fromPattern);
+ buf.append('/');
+ buf.append(toPattern);
+ buf.append('/');
+ if (repeat) {
+ buf.append('g');
+ }
+ }
+ }
+ return buf.toString();
+ }
+
+ /**
+ * Replace the numbered parameters of the form $n where n is from 1 to
+ * the length of params. Normal text is copied directly and $n is replaced
+ * by the corresponding parameter.
+ * @param format the string to replace parameters again
+ * @param params the list of parameters
+ * @return the generated string with the parameter references replaced.
+ * @throws BadFormatString
+ */
+ static String replaceParameters(String format,
+ String[] params) throws BadFormatString {
+ Matcher match = parameterPattern.matcher(format);
+ int start = 0;
+ StringBuilder result = new StringBuilder();
+ while (start < format.length() && match.find(start)) {
+ result.append(match.group(1));
+ String paramNum = match.group(3);
+ if (paramNum != null) {
+ try {
+ int num = Integer.parseInt(paramNum);
+ if (num < 0 || num > params.length) {
+ throw new BadFormatString("index " + num + " from " + format +
+ " is outside of the valid range 0 to " +
+ (params.length - 1));
+ }
+ result.append(params[num]);
+ } catch (NumberFormatException nfe) {
+ throw new BadFormatString("bad format in username mapping in " +
+ paramNum, nfe);
+ }
+
+ }
+ start = match.end();
+ }
+ return result.toString();
+ }
+
+ /**
+ * Replace the matches of the from pattern in the base string with the value
+ * of the to string.
+ * @param base the string to transform
+ * @param from the pattern to look for in the base string
+ * @param to the string to replace matches of the pattern with
+ * @param repeat whether the substitution should be repeated
+ * @return
+ */
+ static String replaceSubstitution(String base, Pattern from, String to,
+ boolean repeat) {
+ Matcher match = from.matcher(base);
+ if (repeat) {
+ return match.replaceAll(to);
+ } else {
+ return match.replaceFirst(to);
+ }
+ }
+
+ /**
+ * Try to apply this rule to the given name represented as a parameter
+ * array.
+ * @param params first element is the realm, second and later elements are
+ * are the components of the name "a/b@FOO" -> {"FOO", "a", "b"}
+ * @return the short name if this rule applies or null
+ * @throws IOException throws if something is wrong with the rules
+ */
+ String apply(String[] params) throws IOException {
+ String result = null;
+ if (isDefault) {
+ if (defaultRealm.equals(params[0])) {
+ result = params[1];
+ }
+ } else if (params.length - 1 == numOfComponents) {
+ String base = replaceParameters(format, params);
+ if (match == null || match.matcher(base).matches()) {
+ if (fromPattern == null) {
+ result = base;
+ } else {
+ result = replaceSubstitution(base, fromPattern, toPattern, repeat);
+ }
+ }
+ }
+ if (result != null && nonSimplePattern.matcher(result).find()) {
+ throw new NoMatchingRule("Non-simple name " + result +
+ " after auth_to_local rule " + this);
+ }
+ return result;
+ }
+ }
+
+ static List parseRules(String rules) {
+ List result = new ArrayList();
+ String remaining = rules.trim();
+ while (remaining.length() > 0) {
+ Matcher matcher = ruleParser.matcher(remaining);
+ if (!matcher.lookingAt()) {
+ throw new IllegalArgumentException("Invalid rule: " + remaining);
+ }
+ if (matcher.group(2) != null) {
+ result.add(new Rule());
+ } else {
+ result.add(new Rule(Integer.parseInt(matcher.group(4)),
+ matcher.group(5),
+ matcher.group(7),
+ matcher.group(9),
+ matcher.group(10),
+ "g".equals(matcher.group(11))));
+ }
+ remaining = remaining.substring(matcher.end());
+ }
+ return result;
+ }
+
+ /**
+ * Set the static configuration to get the rules.
+ * @param conf the new configuration
+ * @throws IOException
+ */
+ public static void setConfiguration() throws IOException {
+ String ruleString = System.getProperty("zookeeper.security.auth_to_local", "DEFAULT");
+ rules = parseRules(ruleString);
+ }
+
+ @SuppressWarnings("serial")
+ public static class BadFormatString extends IOException {
+ BadFormatString(String msg) {
+ super(msg);
+ }
+ BadFormatString(String msg, Throwable err) {
+ super(msg, err);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class NoMatchingRule extends IOException {
+ NoMatchingRule(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Get the translation of the principal name into an operating system
+ * user name.
+ * @return the short name
+ * @throws IOException
+ */
+ public String getShortName() throws IOException {
+ String[] params;
+ if (hostName == null) {
+ // if it is already simple, just return it
+ if (realm == null) {
+ return serviceName;
+ }
+ params = new String[]{realm, serviceName};
+ } else {
+ params = new String[]{realm, serviceName, hostName};
+ }
+ for(Rule r: rules) {
+ String result = r.apply(params);
+ if (result != null) {
+ return result;
+ }
+ }
+ throw new NoMatchingRule("No rules applied to " + toString());
+ }
+
+ static void printRules() throws IOException {
+ int i = 0;
+ for(Rule r: rules) {
+ System.out.println(++i + " " + r);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ for(String arg: args) {
+ KerberosName name = new KerberosName(arg);
+ System.out.println("Name: " + name + " to " + name.getShortName());
+ }
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
new file mode 100644
index 0000000000000..749d411662cc8
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
+/**
+ * SASL Constants.
+ */
+public class SaslConstants {
+
+ public static final String AUTH_METHOD_NAME = "sasl";
+
+ // service broker Principal
+ public static final String JAAS_BROKER_SECTION_NAME = "saslJaasBrokerSectionName";
+ public static final String JAAS_DEFAULT_BROKER_SECTION_NAME = "PulsarBroker";
+
+ //TODO: for sasl proxy.
+ // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
+ public static final String JAAS_PROXY_SECTION_NAME = "saslJaasProxySectionName";
+ public static final String JAAS_DEFAULT_PROXY_SECTION_NAME = "PulsarProxy";
+
+ // Client principal
+ public static final String JAAS_CLIENT_SECTION_NAME = "saslJaasClientSectionName";
+ public static final String JAAS_DEFAULT_CLIENT_SECTION_NAME = "PulsarClient";
+
+ /**
+ * This is a regexp which limits the range of possible ids which can connect to the Broker using SASL.
+ * By default only clients whose id contains 'pulsar' are allowed to connect.
+ */
+ public static final String JAAS_CLIENT_ALLOWED_IDS = "saslJaasClientAllowedIds";
+ public static final String JAAS_CLIENT_ALLOWED_IDS_DEFAULT = ".*pulsar.*";
+
+ public static final String KINIT_COMMAND_DEFAULT = "/usr/bin/kinit";
+
+ public static final String KINIT_COMMAND = "kerberos.kinit";
+
+ // The non-null string name of the protocol for which the authentication is being performed (e.g., "ldap").
+ public static final String SASL_PULSAR_PROTOCOL = "broker";
+ // The non-null fully-qualified host name of the server to authenticate to.
+ public static final String SASL_PULSAR_REALM = "EXAMPLE.COM";
+
+ // Stand for the start of mutual auth between Client and Broker
+ public static final String INIT_PROVIDER_DATA = "isInit";
+
+ public static boolean isUsingTicketCache(String configurationEntry) {
+ AppConfigurationEntry[] entries = Configuration.getConfiguration()
+ .getAppConfigurationEntry(configurationEntry);
+ if (entries == null) {
+ return false;
+ }
+ for (AppConfigurationEntry entry : entries) {
+ // there will only be a single entry, so this for() loop will only be iterated through once.
+ if (entry.getOptions().get("useTicketCache") != null) {
+ String val = (String) entry.getOptions().get("useTicketCache");
+ if (val.equals("true")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static String getPrincipal(String configurationEntry) {
+
+ AppConfigurationEntry[] entries = Configuration.getConfiguration()
+ .getAppConfigurationEntry(configurationEntry);
+ if (entries == null) {
+ return null;
+ }
+ for (AppConfigurationEntry entry : entries) {
+ if (entry.getOptions().get("principal") != null) {
+ return (String) entry.getOptions().get("principal");
+ }
+ }
+ return null;
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
new file mode 100644
index 0000000000000..424b02bbeda26
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.sasl;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * TGT Refresh Thread. Copied from Apache ZooKeeper TGT refresh logic.
+ */
+@Slf4j
+public class TGTRefreshThread extends Thread {
+
+ private static final Random rng = new Random();
+
+ private long lastLogin;
+ private final JAASCredentialsContainer container;
+
+ public long getLastLogin() {
+ return lastLogin;
+ }
+
+ public void setLastLogin(long lastLogin) {
+ this.lastLogin = lastLogin;
+ }
+
+ public TGTRefreshThread(JAASCredentialsContainer container) {
+ this.container = container;
+ // Initialize 'lastLogin' to do a login at first time
+ this.lastLogin = System.currentTimeMillis() - MIN_TIME_BEFORE_RELOGIN;
+ setDaemon(true);
+ setName("pulsar-tgt-refresh-thread");
+ } // Initialize 'lastLogin' to do a login at first time
+
+ private synchronized KerberosTicket getTGT() {
+ Set tickets = container.getSubject().getPrivateCredentials(KerberosTicket.class);
+ for (KerberosTicket ticket : tickets) {
+ KerberosPrincipal server = ticket.getServer();
+ if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+ log.info("Client principal is \"" + ticket.getClient().getName() + "\".");
+ log.info("Server principal is \"" + ticket.getServer().getName() + "\".");
+ return ticket;
+ }
+ }
+ return null;
+ }
+ // LoginThread will sleep until 80% of time from last refresh to
+ // ticket's expiry has been reached, at which time it will wake
+ // and try to renew the ticket.
+ private static final float TICKET_RENEW_WINDOW = 0.80f;
+ /**
+ * Percentage of random jitter added to the renewal time.
+ */
+ private static final float TICKET_RENEW_JITTER = 0.05f;
+ // Regardless of TICKET_RENEW_WINDOW setting above and the ticket expiry time,
+ // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+ // Change the '1' to e.g. 5, to change this to 5 minutes.
+ private static final long MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+ private long getRefreshTime(KerberosTicket tgt) {
+ long start = tgt.getStartTime().getTime();
+ long expires = tgt.getEndTime().getTime();
+ log.info("TGT valid starting at: {}", tgt.getStartTime().toString());
+ log.info("TGT expires: {}", tgt.getEndTime().toString());
+ long proposedRefresh = start
+ + (long) ((expires - start) * (TICKET_RENEW_WINDOW + (TICKET_RENEW_JITTER * rng.nextDouble())));
+ if (proposedRefresh > expires) {
+ // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+ return System.currentTimeMillis();
+ } else {
+ return proposedRefresh;
+ }
+ }
+
+ @Override
+ public void run() {
+ log.info("TGT refresh thread started.");
+ while (true) {
+ // renewal thread's main loop. if it exits from here, thread will exit.
+ KerberosTicket tgt = getTGT();
+ long now = System.currentTimeMillis();
+ long nextRefresh;
+ Date nextRefreshDate;
+ if (tgt == null) {
+ nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
+ nextRefreshDate = new Date(nextRefresh);
+ log.warn("No TGT found: will try again at {}", nextRefreshDate);
+ } else {
+ nextRefresh = getRefreshTime(tgt);
+ long expiry = tgt.getEndTime().getTime();
+ Date expiryDate = new Date(expiry);
+ if ((container.isUsingTicketCache()) && (tgt.getEndTime().equals(tgt.getRenewTill()))) {
+ Object[] logPayload = {expiryDate, container.getPrincipal(), container.getPrincipal()};
+ log.error("The TGT cannot be renewed beyond the next expiry date: {}."
+ + "This process will not be able to authenticate new SASL connections after that "
+ + "time (for example, it will not be authenticate a new connection with a Broker "
+ + "). Ask your system administrator to either increase the "
+ + "'renew until' time by doing : 'modprinc -maxrenewlife {}' within "
+ + "kadmin, or instead, to generate a keytab for {}. Because the TGT's "
+ + "expiry cannot be further extended by refreshing, exiting refresh thread now.", logPayload);
+ return;
+ }
+ // determine how long to sleep from looking at ticket's expiry.
+ // We should not allow the ticket to expire, but we should take into consideration
+ // MIN_TIME_BEFORE_RELOGIN. Will not sleep less than MIN_TIME_BEFORE_RELOGIN, unless doing so
+ // would cause ticket expiration.
+ if ((nextRefresh > expiry) || ((now + MIN_TIME_BEFORE_RELOGIN) > expiry)) {
+ // expiry is before next scheduled refresh).
+ nextRefresh = now;
+ } else {
+ if (nextRefresh < (now + MIN_TIME_BEFORE_RELOGIN)) {
+ // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+ Date until = new Date(nextRefresh);
+ Date newuntil = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+ Object[] logPayload = {until, newuntil, MIN_TIME_BEFORE_RELOGIN / 1000};
+ log.warn("TGT refresh thread time adjusted from : {} to : {} since "
+ + "the former is sooner than the minimum refresh interval ("
+ + "{} seconds) from now.", logPayload);
+ }
+ nextRefresh = Math.max(nextRefresh, now + MIN_TIME_BEFORE_RELOGIN);
+ }
+ nextRefreshDate = new Date(nextRefresh);
+ if (nextRefresh > expiry) {
+ Object[] logPayload = {nextRefreshDate, expiryDate};
+ log.error("next refresh: {} is later than expiry {}." + " This may indicate a clock skew problem."
+ + "Check that this host and the KDC's " + "hosts' clocks are in sync. Exiting refresh thread.",
+ logPayload);
+ return;
+ }
+ }
+ if (now == nextRefresh) {
+ log.info("refreshing now because expiry is before next scheduled refresh time.");
+ } else if (now < nextRefresh) {
+ Date until = new Date(nextRefresh);
+ log.info("TGT refresh sleeping until: {}", until.toString());
+ try {
+ Thread.sleep(nextRefresh - now);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.warn("TGT renewal thread has been interrupted and will exit.");
+ break;
+ }
+ } else {
+ log.error("nextRefresh:{} is in the past: exiting refresh thread. Check"
+ + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+ + " Manual intervention will be required for this client to successfully authenticate."
+ + " Exiting refresh thread.", nextRefreshDate);
+ break;
+ }
+ if (container.isUsingTicketCache()) {
+ String cmd = container.getConfiguration().getOrDefault(SaslConstants.KINIT_COMMAND,
+ SaslConstants.KINIT_COMMAND_DEFAULT);
+ String kinitArgs = "-R";
+ int retry = 1;
+ while (retry >= 0) {
+ try {
+ log.info("running ticket cache refresh command: {} {}", cmd, kinitArgs);
+
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ processBuilder.command("bash", "-c", cmd, kinitArgs);
+ break;
+ } catch (Exception e) {
+ if (retry > 0) {
+ --retry;
+ // sleep for 10 seconds
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted while renewing TGT, exiting Login thread");
+ return;
+ }
+ } else {
+ Object[] logPayload = {cmd, kinitArgs, e.toString(), e};
+ log.warn("Could not renew TGT due to problem running shell command: '{}"
+ + " {}'; exception was:{}. Exiting refresh thread.", logPayload);
+ return;
+ }
+ }
+ }
+ }
+ try {
+ int retry = 1;
+ while (retry >= 0) {
+ try {
+ reLogin();
+ break;
+ } catch (LoginException le) {
+ if (retry > 0) {
+ --retry;
+ // sleep for 10 seconds.
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted during login retry after LoginException:", le);
+ throw le;
+ }
+ } else {
+ log.error("Could not refresh TGT for principal: {}.", container.getPrincipal(), le);
+ }
+ }
+ }
+ } catch (LoginException le) {
+ log.error("Failed to refresh TGT: refresh thread exiting now.", le);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
+ * c.f. HADOOP-6559
+ * @throws LoginException on a failure
+ */
+ private synchronized void reLogin() throws LoginException {
+ LoginContext login = container.getLoginContext();
+ if (login == null) {
+ throw new LoginException("login must be done first");
+ }
+ if (!hasSufficientTimeElapsed()) {
+ return;
+ }
+ log.info("Initiating logout for {}", container.getPrincipal());
+ synchronized (this) {
+ //clear up the kerberos state. But the tokens are not cleared! As per
+ //the Java kerberos login module code, only the kerberos credentials
+ //are cleared
+ login.logout();
+ //login and also update the subject field of this instance to
+ //have the new credentials (pass it to the LoginContext constructor)
+ login = new LoginContext(container.getLoginContextName(), container.getSubject());
+ log.info("Initiating re-login for {}", container.getPrincipal());
+ login.login();
+ container.setLoginContext(login);
+ }
+ }
+
+ private boolean hasSufficientTimeElapsed() {
+ long now = System.currentTimeMillis();
+ if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN) {
+ log.warn("Not attempting to re-login since the last re-login was "
+ + "attempted less than {} seconds before.", MIN_TIME_BEFORE_RELOGIN / 1000);
+ return false;
+ }
+ // register most recent relogin attempt
+ setLastLogin(now);
+ return true;
+ }
+
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index b984f81687cd5..edc0c1e55550c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -245,7 +245,10 @@ public ScheduledExecutorService getExecutor() {
public boolean isAuthenticationEnabled() {
if (this.config == null)
return false;
- return this.config.isAuthenticationEnabled();
+ // TODO: isSaslAuthentication used to bypass web resource check.
+ // will remove it after implementation the support.
+ // github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
+ return this.config.isAuthenticationEnabled() && !this.config.isSaslAuthentication();
}
public boolean isAuthorizationEnabled() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
index 10e3664edf65d..e6eb15849a130 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
@@ -96,7 +96,7 @@ public AuthenticationDataHttps authData() {
* if not authorized
*/
protected void validateSuperUserAccess() {
- if (service().getConfig().isAuthenticationEnabled()) {
+ if (service().getConfig().isAuthenticationEnabled() && !service().getConfig().isSaslAuthentication()) {
String appId = clientAppId();
if (log.isDebugEnabled()) {
log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),