Skip to content

Commit

Permalink
Support returning user subject with variables for AuthenticationProvi…
Browse files Browse the repository at this point in the history
…derMTls (#1470)
  • Loading branch information
coderzc authored and Technoboy- committed Sep 26, 2024
1 parent 34f0461 commit 168aeec
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls;
import io.streamnative.pulsar.handlers.mqtt.authentication.AuthenticationProviderMTls;
import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils;
import java.util.HashMap;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.authentication;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public final class AuthRequest {
private String subject;
private Map<String, String> variables;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.identitypool;

import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN;
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS;
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN;
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SHA1;
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SNID;
package io.streamnative.pulsar.handlers.mqtt.authentication;

import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.DN;
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SAN;
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SHA1;
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SNID;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import io.streamnative.oidc.broker.common.OIDCPoolResources;
Expand All @@ -43,10 +44,11 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand All @@ -64,6 +66,8 @@ public class AuthenticationProviderMTls implements AuthenticationProvider {
@VisibleForTesting
private OIDCPoolResources poolResources;

private final ObjectMapper objectMapper = ObjectMapperFactory.create();

@Getter
@VisibleForTesting
private final ConcurrentHashMap<String, ExpressionCompiler> poolMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -197,7 +201,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
final X509Certificate certificate = (X509Certificate) certs[0];

// parse DN
Map<String, Object> params;
Map<String, String> params;
try {
String subject = certificate.getSubjectX500Principal().getName();
params = parseDN(subject);
Expand All @@ -213,20 +217,24 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
// parse SHA1
params.put(SHA1, parseSHA1FingerPrint(certificate));

String principal = matchPool(params);
if (principal.isEmpty()) {
String poolName = matchPool(params);
if (poolName.isEmpty()) {
errorCode = ErrorCode.NO_MATCH_POOL;
throw new AuthenticationException("No matched identity pool from the client certificate");
}
AuthenticationMetrics.authenticateSuccess(this.getClass().getSimpleName(), this.getAuthMethodName());
return principal;
AuthRequest authRequest = new AuthRequest(poolName, params);
String authRequestJson = objectMapper.writeValueAsString(authRequest);
return authRequestJson;
} catch (AuthenticationException e) {
this.incrementFailureMetric(errorCode);
throw e;
} catch (JsonProcessingException e) {
log.error("Failed to serialize the auth request", e);
throw new AuthenticationException(e.getMessage());
}
}

public String matchPool(Map<String, Object> params) throws AuthenticationException {
public String matchPool(Map<String, String> params) throws AuthenticationException {
List<String> principals = new ArrayList<>();
poolMap.forEach((poolName, compiler) -> {
Boolean matched = false;
Expand Down Expand Up @@ -269,32 +277,38 @@ static String parseSHA1FingerPrint(X509Certificate certificate) {
}
}

static Map<String, Object> parseDN(String dn) throws InvalidNameException {
Map<String, Object> params = new HashMap<>();
static Map<String, String> parseDN(String dn) throws InvalidNameException {
Map<String, String> params = new HashMap<>();
if (StringUtils.isEmpty(dn)) {
return params;
}
params.put(DN, dn);
LdapName ldapName = new LdapName(dn);
for (Rdn rdn : ldapName.getRdns()) {
String rdnType = rdn.getType().toUpperCase();
if (DN_KEYS.contains(rdnType)) {
String value = Rdn.escapeValue(rdn.getValue());
value = value.replace("\r", "\\0D");
value = value.replace("\n", "\\0A");
params.put(rdnType, value);
}
String value = Rdn.escapeValue(rdn.getValue());
value = value.replace("\r", "\\0D");
value = value.replace("\n", "\\0A");
params.put(rdnType, value);
}

return params;
}

static void parseSAN(X509Certificate certificate, @NotNull Map<String, Object> map) {
static void parseSAN(X509Certificate certificate, @NotNull Map<String, String> map) {
try {
// byte[] extensionValue = certificate.getExtensionValue("2.5.29.17");
// TODO How to get the original extension name
Collection<List<?>> subjectAlternativeNames = certificate.getSubjectAlternativeNames();
if (subjectAlternativeNames != null) {
List<String> formattedSANList = subjectAlternativeNames.stream()
.map(list -> getSanName((int) list.get(0)) + ":" + list.get(1))
.map(list -> {
String sanName = getSanName((int) list.get(0));
String sanValue = (String) list.get(1);
map.put(sanName, sanValue);
sanName = mapSANNames(sanName, sanValue, map);
return sanName + ":" + sanValue;
})
.collect(Collectors.toList());
String formattedSAN = String.join(",", formattedSANList);
map.put(SAN, formattedSAN);
Expand All @@ -304,10 +318,27 @@ static void parseSAN(X509Certificate certificate, @NotNull Map<String, Object> m
}
}

static String mapSANNames(String sanName, String sanValue, @NotNull Map<String, String> map) {
String newSanName = sanName;
// "RFC822NAME:aaa" -> "EMAIL:aaa,DEVICE_ID:aaa,RFC822NAME:aaa"
if (sanName.equals("DNS")) {
StrBuilder strBuilder = new StrBuilder();
strBuilder.append("EMAIL:").append(sanValue).append(",");
map.put("EMAIL", sanValue);

// strBuilder.append("DEVICE_ID:").append(sanValue).append(",");
// map.put("DEVICE_ID", sanValue);

strBuilder.append(sanName);
newSanName = strBuilder.toString();
}
return newSanName;
}

private static String getSanName(int type) {
return switch (type) {
case 0 -> "OTHERNAME";
case 1 -> "EMAIL";
case 1 -> "RFC822NAME";
case 2 -> "DNS";
case 3 -> "X400";
case 4 -> "DIR";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.identitypool;
package io.streamnative.pulsar.handlers.mqtt.authentication;


import dev.cel.common.CelAbstractSyntaxTree;
Expand Down Expand Up @@ -73,7 +73,7 @@ private void compile() throws CelValidationException, CelEvaluationException {
this.program = runtime.createProgram(ast);
}

public Boolean eval(Map<String, Object> mapValue) throws Exception {
public Boolean eval(Map<String, String> mapValue) throws Exception {
final Object eval = program.eval(mapValue);
if (eval instanceof Boolean) {
return (Boolean) eval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.identitypool;
package io.streamnative.pulsar.handlers.mqtt.authentication;
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.identitypool;
package io.streamnative.pulsar.handlers.mqtt.authentication;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.local.LocalAddress;
import io.streamnative.oidc.broker.common.pojo.Pool;
import java.io.File;
Expand All @@ -25,9 +26,11 @@
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSessionContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
Expand All @@ -38,13 +41,15 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class AuthenticationProviderMTlsTest {

private static final String SUPER_USER = "superUser";
private static final String CLUSTER = "mtls-test";

private ServiceConfiguration serviceConfiguration;
private LocalMemoryMetadataStore metadataStore;
private final ObjectMapper objectMapper = ObjectMapperFactory.create();

@SuppressWarnings("UnstableApiUsage")
@BeforeClass
Expand Down Expand Up @@ -73,7 +78,8 @@ public Object[][] reuseMetadata() {
public void testExpression() throws Exception {
String dn = FileUtils.readFileToString(new File(getResourcePath("mtls/cel-test.txt")), "UTF-8");

Map<String, Object> params = AuthenticationProviderMTls.parseDN(dn);
Map<String, String> params = AuthenticationProviderMTls.parseDN(dn);
params.put("O2", "StreamNative, Inc.");

ExpressionCompiler compiler = new ExpressionCompiler("DN.contains(\"CN=streamnative.io\")");
Boolean eval = compiler.eval(params);
Expand Down Expand Up @@ -121,7 +127,9 @@ public void testAuthenticationProviderMTls(boolean reuseMetadata) throws Excepti
SSLSession sslSession = new MockSSLSession(x509Certificates);
AuthenticationDataCommand authData = new AuthenticationDataCommand("", LocalAddress.ANY, sslSession);
String principal = authenticationProvider.authenticate(authData);
Assert.assertEquals(principal, poolName);
log.info("Principal: {}", principal);
AuthRequest authRequest = objectMapper.readValue(principal, AuthRequest.class);
Assert.assertEquals(authRequest.getSubject(), poolName);
authenticationProvider.close();
}

Expand Down

0 comments on commit 168aeec

Please sign in to comment.