Skip to content

Commit

Permalink
WIP: PIP 97 Async HTTP Auth Filter
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Jan 25, 2023
1 parent 8049690 commit 605419f
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -135,8 +137,16 @@ default AuthenticationState newHttpAuthState(HttpServletRequest request)
*/
default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletRequest request,
HttpServletResponse response) {
AsyncContext ctx = request.getAsyncContext();
try {
return CompletableFuture.completedFuture(this.authenticateHttpRequest(request, response));
final AuthenticationDataSource authDataSource = newHttpAuthState(request).getAuthDataSource();
return authenticateAsync(authDataSource)
.thenApply(role -> {
ServletRequest r = ctx.getRequest();
r.setAttribute(AuthenticatedRoleAttributeName, role);
r.setAttribute(AuthenticatedDataAttributeName, authDataSource);
return true;
});
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -102,64 +103,72 @@ private AuthenticationProvider getAuthProvider(String authMethodName) throws Aut
return providerToUse;
}

public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response)
throws Exception {
/**
*
* @param request
* @param response
* @return
*/
public CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletRequest request,
HttpServletResponse response) {
String authMethodName = getAuthMethodName(request);
authMethodName = authMethodName == null ? "BasicAuthentication" : authMethodName;
if (authMethodName == null
&& SaslConstants.SASL_TYPE_VALUE.equalsIgnoreCase(request.getHeader(SaslConstants.SASL_HEADER_TYPE))) {
// This edge case must be handled because the Pulsar SASL implementation does not add the
// X-Pulsar-Auth-Method-Name header.
authMethodName = SaslConstants.AUTH_METHOD_NAME;
}
if (authMethodName != null) {
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
try {
return providerToUse.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : "
+ e.getMessage(), e);
}
throw e;
// if (authMethodName != null) {
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
return CompletableFuture.failedFuture(new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName)));
}
} else {
for (AuthenticationProvider provider : providers.values()) {
try {
return provider.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
+ e.getMessage(), e);
}
// Ignore the exception because we don't know which authentication method is expected here.
}
}
// No authentication provided
if (!providers.isEmpty()) {
if (StringUtils.isNotBlank(anonymousUserRole)) {
request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole);
request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request));
return true;
}
// If at least a provider was configured, then the authentication needs to be provider
throw new AuthenticationException("Authentication required");
} else {
// No authentication required
return true;
}
}
return providerToUse.authenticateHttpRequestAsync(request, response);
// todo how to handle exceptional case?
// } else {
// for (AuthenticationProvider provider : providers.values()) {
// try {
// return provider.authenticateHttpRequest(request, response);
// } catch (AuthenticationException e) {
// if (LOG.isDebugEnabled()) {
// LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
// + e.getMessage(), e);
// }
// // Ignore the exception because we don't know which authentication method is expected here.
// }
// }
// // No authentication provided
// if (!providers.isEmpty()) {
// if (StringUtils.isNotBlank(anonymousUserRole)) {
// request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole);
// request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request));
// return true;
// }
// // If at least a provider was configured, then the authentication needs to be provider
// throw new AuthenticationException("Authentication required");
// } else {
// // No authentication required
// return true;
// }
// }
}

/**
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}
* @deprecated use {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)}
*/
@Deprecated(since = "2.12.0")
public String authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData)
throws AuthenticationException {
String authMethodName = getAuthMethodName(request);

if (authMethodName != null) {
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
throw new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName));
}
try {
if (authData == null) {
AuthenticationState authenticationState = providerToUse.newHttpAuthState(request);
Expand Down Expand Up @@ -205,7 +214,7 @@ public String authenticateHttpRequest(HttpServletRequest request, Authentication
/**
* Mark this function as deprecated, it is recommended to use a method with the AuthenticationDataSource
* signature to implement it.
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}.
* @deprecated use {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)}.
*/
@Deprecated
public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;
import javax.naming.AuthenticationException;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -52,20 +55,69 @@ public AuthenticationFilter(AuthenticationService authenticationService) {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
boolean doFilter = authenticationService
.authenticateHttpRequest((HttpServletRequest) request, (HttpServletResponse) response);
if (doFilter) {
chain.doFilter(request, response);
if (request.getAttribute(AuthenticatedRoleAttributeName) != null) {
chain.doFilter(request, response);
return;
}
AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
try {
chain.doFilter(event.getSuppliedRequest(), event.getSuppliedResponse());
} catch (ServletException e) {
throw new RuntimeException(e);
}
}
} catch (Exception e) {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
if (e instanceof AuthenticationException) {
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);

@Override
public void onTimeout(AsyncEvent event) throws IOException {

}

@Override
public void onError(AsyncEvent event) throws IOException {

}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {

}
});
authenticationService
.authenticateHttpRequestAsync((HttpServletRequest) request, (HttpServletResponse) response)
.whenComplete((doFilter, throwable) -> {
if (throwable != null) {
try {
HttpServletResponse httpResponse = (HttpServletResponse) asyncContext.getResponse();
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
if (throwable instanceof AuthenticationException) {
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(),
throwable.getMessage());
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(),
throwable);
}
} catch (IOException e) {
LOG.error("Error while responding to HTTP request", e);
} finally {
asyncContext.complete();
}
} else {
asyncContext.getRequest().setAttribute("do_filter", doFilter);
asyncContext.complete();
}
});
}

private void runFilter(FilterChain chain, AsyncContext asyncContext) {
try {
chain.doFilter(asyncContext.getRequest(), asyncContext.getResponse());
} catch (IOException | ServletException e) {
LOG.error("Error in HTTP filtering", e);
} finally {
asyncContext.complete();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testAuthenticationHttpRequestResponse() throws Exception {
when(request.getRemoteAddr()).thenReturn("192.168.1.1");
when(request.getRemotePort()).thenReturn(8080);
when(request.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("auth");
boolean doFilter = service.authenticateHttpRequest(request, (HttpServletResponse) null);
boolean doFilter = service.authenticateHttpRequestAsync(request, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(request).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success);
verify(request).setAttribute(eq(AuthenticatedDataAttributeName), any(AuthenticationDataHttps.class));
Expand All @@ -122,15 +122,15 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws
when(requestDefaultAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1");
when(requestDefaultAuthProvider.getRemotePort()).thenReturn(8080);
when(requestDefaultAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("auth");
doFilter = service.authenticateHttpRequest(requestDefaultAuthProvider, (HttpServletResponse) null);
doFilter = service.authenticateHttpRequestAsync(requestDefaultAuthProvider, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(requestDefaultAuthProvider).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success);

HttpServletRequest requestCustomAuthProvider = mock(HttpServletRequest.class);
when(requestCustomAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1");
when(requestCustomAuthProvider.getRemotePort()).thenReturn(8080);
when(requestCustomAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("customAuthProvider");
doFilter = service.authenticateHttpRequest(requestCustomAuthProvider, (HttpServletResponse) null);
doFilter = service.authenticateHttpRequestAsync(requestCustomAuthProvider, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(requestCustomAuthProvider).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success);

Expand All @@ -139,7 +139,7 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws
when(requestUnsupportedAuthProvider.getRemotePort()).thenReturn(8080);
when(requestUnsupportedAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("unsupportedAuthProvider");
Assert.assertThrows(() ->
service.authenticateHttpRequest(requestUnsupportedAuthProvider, (HttpServletResponse) null));
service.authenticateHttpRequestAsync(requestUnsupportedAuthProvider, (HttpServletResponse) null));

service.close();
}
Expand All @@ -159,7 +159,7 @@ public void testAuthenticationHttpRequestResponseWithAnonymousRole() throws Exce
HttpServletRequest requestCustomAuthProvider = mock(HttpServletRequest.class);
when(requestCustomAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1");
when(requestCustomAuthProvider.getRemotePort()).thenReturn(8080);
doFilter = service.authenticateHttpRequest(requestCustomAuthProvider, (HttpServletResponse) null);
doFilter = service.authenticateHttpRequestAsync(requestCustomAuthProvider, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(requestCustomAuthProvider).setAttribute(AuthenticatedRoleAttributeName, anonRole);

Expand Down

0 comments on commit 605419f

Please sign in to comment.