diff --git a/hadoop-unit-confluent-rest/pom.xml b/hadoop-unit-confluent-rest/pom.xml index 93e466ce..7481c0bd 100644 --- a/hadoop-unit-confluent-rest/pom.xml +++ b/hadoop-unit-confluent-rest/pom.xml @@ -41,13 +41,13 @@ org.apache.kafka kafka_2.12 - 1.1.0 + ${confluent_kafka.version} org.apache.kafka kafka-clients - 1.1.0 + ${confluent_kafka.version} @@ -70,7 +70,7 @@ org.eclipse.jetty jetty-util - 9.4.14.v20181114 + 9.4.18.v20190429 diff --git a/hadoop-unit-confluent/pom.xml b/hadoop-unit-confluent/pom.xml index 2048e29e..e8130376 100644 --- a/hadoop-unit-confluent/pom.xml +++ b/hadoop-unit-confluent/pom.xml @@ -77,7 +77,7 @@ org.eclipse.jetty jetty-util - 9.4.14.v20181114 + 9.4.18.v20190429 diff --git a/hadoop-unit-confluent/src/main/java/fr/jetoile/hadoopunit/component/ConfluentKsqlRestBootstrap.java b/hadoop-unit-confluent/src/main/java/fr/jetoile/hadoopunit/component/ConfluentKsqlRestBootstrap.java index 5e4fc692..205d2be4 100644 --- a/hadoop-unit-confluent/src/main/java/fr/jetoile/hadoopunit/component/ConfluentKsqlRestBootstrap.java +++ b/hadoop-unit-confluent/src/main/java/fr/jetoile/hadoopunit/component/ConfluentKsqlRestBootstrap.java @@ -100,8 +100,6 @@ private void build() { KsqlRestConfig config = new KsqlRestConfig(ksqlConfig); restServer = KsqlRestApplication.buildApplication(config, KsqlVersionCheckerAgent::new, Integer.MAX_VALUE); restServer.createServer(); - } catch (RestConfigException e) { - LOGGER.error("Server configuration failed: ", e); } catch (Exception e) { LOGGER.error("Server died unexpectedly: ", e); } diff --git a/hadoop-unit-confluent/src/main/java/io/confluent/rest/Application.java b/hadoop-unit-confluent/src/main/java/io/confluent/rest/Application.java index 667b0b05..318c8c47 100644 --- a/hadoop-unit-confluent/src/main/java/io/confluent/rest/Application.java +++ b/hadoop-unit-confluent/src/main/java/io/confluent/rest/Application.java @@ -19,12 +19,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; +import io.confluent.rest.auth.AuthUtil; + +import org.apache.kafka.common.config.ConfigException; import org.eclipse.jetty.jaas.JAASLoginService; +import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; import org.eclipse.jetty.security.DefaultIdentityService; -import org.eclipse.jetty.security.SecurityHandler; +import org.eclipse.jetty.security.IdentityService; +import org.eclipse.jetty.security.LoginService; import org.eclipse.jetty.security.authentication.BasicAuthenticator; +import org.eclipse.jetty.security.authentication.LoginAuthenticator; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.NetworkTrafficServerConnector; import org.eclipse.jetty.server.Server; @@ -41,7 +47,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.CrossOriginFilter; import org.eclipse.jetty.util.resource.ResourceCollection; -import org.eclipse.jetty.util.security.Constraint; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.jsr356.server.ServerContainer; import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; @@ -52,6 +57,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -63,12 +70,12 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.servlet.DispatcherType; import javax.servlet.ServletException; import javax.ws.rs.core.Configurable; -import io.confluent.common.config.ConfigException; import io.confluent.common.metrics.JmxReporter; import io.confluent.common.metrics.MetricConfig; import io.confluent.common.metrics.Metrics; @@ -76,20 +83,28 @@ import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper; import io.confluent.rest.exceptions.GenericExceptionMapper; import io.confluent.rest.exceptions.WebApplicationExceptionMapper; +import io.confluent.rest.extension.ResourceExtension; import io.confluent.rest.metrics.MetricsResourceMethodApplicationListener; import io.confluent.rest.validation.JacksonMessageBodyProvider; +import static io.confluent.rest.RestConfig.REST_SERVLET_INITIALIZERS_CLASSES_CONFIG; +import static io.confluent.rest.RestConfig.WEBSOCKET_SERVLET_INITIALIZERS_CLASSES_CONFIG; + /** * A REST application. Extend this class and implement setupResources() to register REST * resources with the JAX-RS server. Use createServer() to get a fully-configured, ready to run * Jetty server. */ +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public abstract class Application { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling protected T config; protected Server server = null; protected CountDownLatch shutdownLatch = new CountDownLatch(1); protected Metrics metrics; protected final Slf4jRequestLog requestLog; + protected final List resourceExtensions = new ArrayList<>(); + protected SslContextFactory sslContextFactory; private static final Logger log = LoggerFactory.getLogger(Application.class); @@ -107,30 +122,88 @@ public Application(T config) { this.requestLog = new Slf4jRequestLog(); this.requestLog.setLoggerName(config.getString(RestConfig.REQUEST_LOGGER_NAME_CONFIG)); this.requestLog.setLogLatency(true); + this.sslContextFactory = createSslContextFactory(); } + /** + * Register resources or additional Providers, ExceptionMappers, and other JAX-RS components with + * the Jersey application. This, combined with your Configuration class, is where you can + * customize the behavior of the application. + */ public abstract void setupResources(Configurable config, T appConfig); + /** + * Returns a list of static resources to serve using the default servlet. + * + *

For example, static files can be served from class loader resources by returning + * {@code + * new ResourceCollection(Resource.newClassPathResource("static")); + * } + * + *

For those resources to get served, it is necessary to add a static resources property to the + * config in @link{{@link #setupResources(Configurable, RestConfig)}}, e.g. using something like + * {@code + * config.property(ServletProperties.FILTER_STATIC_CONTENT_REGEX, "/(static/.*|.*\\.html|)"); + * } + * + * @return static resource collection + */ protected ResourceCollection getStaticResources() { return null; } + /** + * add any servlet filters that should be called before resource handling + */ + protected void configurePreResourceHandling(ServletContextHandler context) {} + + /** + * expose SslContextFactory + */ + protected SslContextFactory getSslContextFactory() { + return this.sslContextFactory; + } + + /** + * add any servlet filters that should be called after resource + * handling but before falling back to the default servlet + */ protected void configurePostResourceHandling(ServletContextHandler context) {} + /** + * add any servlet filters that should be called after resource + * handling but before falling back to the default servlet + */ + protected void configureWebSocketPostResourceHandling(ServletContextHandler context) {} + + /** + * Returns a map of tag names to tag values to apply to metrics for this application. + * + * @return a Map of tags and values + */ public Map getMetricsTags() { return new LinkedHashMap(); } - public Server createServer() throws RestConfigException, ServletException { + /** + * Configure and create the server. + */ + // CHECKSTYLE_RULES.OFF: MethodLength|CyclomaticComplexity|JavaNCSS|NPathComplexity + public Server createServer() throws ServletException { + // CHECKSTYLE_RULES.ON: MethodLength|CyclomaticComplexity|JavaNCSS|NPathComplexity + // The configuration for the JAX-RS REST service ResourceConfig resourceConfig = new ResourceConfig(); - Map configuredTags = getConfiguration().getMap(RestConfig.METRICS_TAGS_CONFIG); + Map configuredTags = parseListToMap( + getConfiguration().getList(RestConfig.METRICS_TAGS_CONFIG) + ); Map combinedMetricsTags = new HashMap<>(getMetricsTags()); combinedMetricsTags.putAll(configuredTags); configureBaseApplication(resourceConfig, combinedMetricsTags); + configureResourceExtensions(resourceConfig); setupResources(resourceConfig, getConfiguration()); // Configure the servlet container @@ -142,7 +215,7 @@ public Server createServer() throws RestConfigException, ServletException { protected void doStop() throws Exception { super.doStop(); Application.this.metrics.close(); - Application.this.onShutdown(); + Application.this.doShutdown(); Application.this.shutdownLatch.countDown(); } }; @@ -162,81 +235,22 @@ protected void doStop() throws Exception { if (listener.getScheme().equals("http")) { connector = new NetworkTrafficServerConnector(server); } else { - SslContextFactory sslContextFactory = new SslContextFactory(); - // IMPORTANT: the key's CN, stored in the keystore, must match the FQDN. - // TODO: investigate this further. Would be better to use SubjectAltNames. - if (!config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG).isEmpty()) { - sslContextFactory.setKeyStorePath( - config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG) - ); - sslContextFactory.setKeyStorePassword( - config.getPassword(RestConfig.SSL_KEYSTORE_PASSWORD_CONFIG).value() - ); - sslContextFactory.setKeyManagerPassword( - config.getPassword(RestConfig.SSL_KEY_PASSWORD_CONFIG).value() - ); - sslContextFactory.setKeyStoreType( - config.getString(RestConfig.SSL_KEYSTORE_TYPE_CONFIG) - ); - - if (!config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG).isEmpty()) { - sslContextFactory.setKeyManagerFactoryAlgorithm( - config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG)); - } - } - - sslContextFactory.setNeedClientAuth(config.getBoolean(RestConfig.SSL_CLIENT_AUTH_CONFIG)); - - List enabledProtocols = config.getList(RestConfig.SSL_ENABLED_PROTOCOLS_CONFIG); - if (!enabledProtocols.isEmpty()) { - sslContextFactory.setIncludeProtocols(enabledProtocols.toArray(new String[0])); - } - - List cipherSuites = config.getList(RestConfig.SSL_CIPHER_SUITES_CONFIG); - if (!cipherSuites.isEmpty()) { - sslContextFactory.setIncludeCipherSuites(cipherSuites.toArray(new String[0])); - } - - if (!config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG).isEmpty()) { - sslContextFactory.setEndpointIdentificationAlgorithm( - config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)); - } - - if (!config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG).isEmpty()) { - sslContextFactory.setTrustStorePath( - config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG) - ); - sslContextFactory.setTrustStorePassword( - config.getPassword(RestConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG).value() - ); - sslContextFactory.setTrustStoreType( - config.getString(RestConfig.SSL_TRUSTSTORE_TYPE_CONFIG) - ); - - if (!config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG).isEmpty()) { - sslContextFactory.setTrustManagerFactoryAlgorithm( - config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG) - ); - } - } - - sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROTOCOL_CONFIG)); - if (!config.getString(RestConfig.SSL_PROVIDER_CONFIG).isEmpty()) { - sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROVIDER_CONFIG)); - } - connector = new NetworkTrafficServerConnector(server, sslContextFactory); } connector.addNetworkTrafficListener(metricsListener); connector.setPort(listener.getPort()); connector.setHost(listener.getHost()); + + connector.setIdleTimeout(config.getLong(RestConfig.IDLE_TIMEOUT_MS_CONFIG)); + server.addConnector(connector); } ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); + ServletHolder defaultHolder = new ServletHolder("default", DefaultServlet.class); defaultHolder.setInitParameter("dirAllowed", "false"); @@ -245,37 +259,35 @@ protected void doStop() throws Exception { context.setBaseResource(staticResources); } - String authMethod = config.getString(RestConfig.AUTHENTICATION_METHOD_CONFIG); - if (enableBasicAuth(authMethod)) { - String realm = getConfiguration().getString(RestConfig.AUTHENTICATION_REALM_CONFIG); - List roles = getConfiguration().getList(RestConfig.AUTHENTICATION_ROLES_CONFIG); - final SecurityHandler securityHandler = createSecurityHandler(realm, roles); - context.setSecurityHandler(securityHandler); - } - - List unsecurePaths = config.getList(RestConfig.AUTHENTICATION_SKIP_PATHS); - setUnsecurePathConstraints(context, unsecurePaths); + configureSecurityHandler(context); - String allowedOrigins = getConfiguration().getString( - RestConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG - ); - if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) { + if (isCorsEnabled()) { + String allowedOrigins = config.getString(RestConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG); FilterHolder filterHolder = new FilterHolder(CrossOriginFilter.class); filterHolder.setName("cross-origin"); - filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins); - String allowedMethods = getConfiguration().getString( - RestConfig.ACCESS_CONTROL_ALLOW_METHODS + filterHolder.setInitParameter( + CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins + ); - if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) { + String allowedMethods = config.getString(RestConfig.ACCESS_CONTROL_ALLOW_METHODS); + String allowedHeaders = config.getString(RestConfig.ACCESS_CONTROL_ALLOW_HEADERS); + if (allowedMethods != null && !allowedMethods.trim().isEmpty()) { filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods); } + if (allowedHeaders != null && !allowedHeaders.trim().isEmpty()) { + filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_HEADERS_PARAM, allowedHeaders); + } + // handle preflight cors requests at the filter level, do not forward down the filter chain + filterHolder.setInitParameter(CrossOriginFilter.CHAIN_PREFLIGHT_PARAM, "false"); context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); } - + configurePreResourceHandling(context); context.addFilter(servletHolder, "/*", null); configurePostResourceHandling(context); context.addServlet(defaultHolder, "/*"); + applyCustomConfiguration(context, REST_SERVLET_INITIALIZERS_CLASSES_CONFIG); + RequestLogHandler requestLogHandler = new RequestLogHandler(); requestLogHandler.setRequestLog(requestLog); @@ -286,22 +298,30 @@ protected void doStop() throws Exception { StatisticsHandler statsHandler = new StatisticsHandler(); statsHandler.setHandler(handlers); - final ServletContextHandler webSocketServletContext = + final ServletContextHandler webSocketContext = new ServletContextHandler(ServletContextHandler.SESSIONS); - webSocketServletContext.setContextPath( + webSocketContext.setContextPath( config.getString(RestConfig.WEBSOCKET_PATH_PREFIX_CONFIG) ); + + configureSecurityHandler(webSocketContext); + final ContextHandlerCollection contexts = new ContextHandlerCollection(); contexts.setHandlers(new Handler[] { statsHandler, - webSocketServletContext + webSocketContext }); server.setHandler(wrapWithGzipHandler(contexts)); ServerContainer container = - WebSocketServerContainerInitializer.configureContext(webSocketServletContext); + WebSocketServerContainerInitializer.configureContext(webSocketContext); registerWebSocketEndpoints(container); + + configureWebSocketPostResourceHandling(webSocketContext); + + applyCustomConfiguration(webSocketContext, WEBSOCKET_SERVLET_INITIALIZERS_CLASSES_CONFIG); + int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG); if (gracefulShutdownMs > 0) { server.setStopTimeout(gracefulShutdownMs); @@ -311,6 +331,169 @@ protected void doStop() throws Exception { return server; } + // This is copied from the old MAP implementation from cp ConfigDef.Type.MAP + public static Map parseListToMap(List list) { + Map configuredTags = new HashMap<>(); + for (String entry : list) { + String[] keyValue = entry.split("\\s*:\\s*", -1); + if (keyValue.length != 2) { + throw new ConfigException("Map entry should be of form : { + try { + ext.register(resourceConfig, this); + } catch (Exception e) { + throw new RuntimeException("Exception throw by resource extension. ext:" + ext, e); + } + }); + } + + @SuppressWarnings("unchecked") + private void applyCustomConfiguration( + ServletContextHandler context, + String initializerConfigName) { + getConfiguration() + .getConfiguredInstances(initializerConfigName, Consumer.class) + .forEach(initializer -> { + try { + initializer.accept(context); + } catch (final Exception e) { + throw new RuntimeException("Exception from custom initializer. " + + "config:" + initializerConfigName + ", initializer" + initializer, e); + } + }); + } + + protected void configureSecurityHandler(ServletContextHandler context) { + String authMethod = config.getString(RestConfig.AUTHENTICATION_METHOD_CONFIG); + if (enableBasicAuth(authMethod)) { + context.setSecurityHandler(createBasicSecurityHandler()); + } else if (enableBearerAuth(authMethod)) { + context.setSecurityHandler(createBearerSecurityHandler()); + } + } + + private SslContextFactory createSslContextFactory() { + SslContextFactory sslContextFactory = new SslContextFactory.Server(); + if (!config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG).isEmpty()) { + sslContextFactory.setKeyStorePath( + config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG) + ); + sslContextFactory.setKeyStorePassword( + config.getPassword(RestConfig.SSL_KEYSTORE_PASSWORD_CONFIG).value() + ); + sslContextFactory.setKeyManagerPassword( + config.getPassword(RestConfig.SSL_KEY_PASSWORD_CONFIG).value() + ); + sslContextFactory.setKeyStoreType( + config.getString(RestConfig.SSL_KEYSTORE_TYPE_CONFIG) + ); + + if (!config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG).isEmpty()) { + sslContextFactory.setKeyManagerFactoryAlgorithm( + config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG)); + } + } + + configureClientAuth(sslContextFactory); + + List enabledProtocols = config.getList(RestConfig.SSL_ENABLED_PROTOCOLS_CONFIG); + if (!enabledProtocols.isEmpty()) { + sslContextFactory.setIncludeProtocols(enabledProtocols.toArray(new String[0])); + } + + List cipherSuites = config.getList(RestConfig.SSL_CIPHER_SUITES_CONFIG); + if (!cipherSuites.isEmpty()) { + sslContextFactory.setIncludeCipherSuites(cipherSuites.toArray(new String[0])); + } + + sslContextFactory.setEndpointIdentificationAlgorithm( + config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)); + + if (!config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG).isEmpty()) { + sslContextFactory.setTrustStorePath( + config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG) + ); + sslContextFactory.setTrustStorePassword( + config.getPassword(RestConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG).value() + ); + sslContextFactory.setTrustStoreType( + config.getString(RestConfig.SSL_TRUSTSTORE_TYPE_CONFIG) + ); + + if (!config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG).isEmpty()) { + sslContextFactory.setTrustManagerFactoryAlgorithm( + config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG) + ); + } + } + + sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROTOCOL_CONFIG)); + if (!config.getString(RestConfig.SSL_PROVIDER_CONFIG).isEmpty()) { + sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROVIDER_CONFIG)); + } + + sslContextFactory.setRenegotiationAllowed(false); + + return sslContextFactory; + } + + @SuppressWarnings("deprecation") + private void configureClientAuth(SslContextFactory sslContextFactory) { + String clientAuthentication = config.getString(RestConfig.SSL_CLIENT_AUTHENTICATION_CONFIG); + + if (config.originals().containsKey(RestConfig.SSL_CLIENT_AUTH_CONFIG)) { + if (config.originals().containsKey(RestConfig.SSL_CLIENT_AUTHENTICATION_CONFIG)) { + log.warn( + "The {} configuration is deprecated. Since a value has been supplied for the {} " + + "configuration, that will be used instead", + RestConfig.SSL_CLIENT_AUTH_CONFIG, + RestConfig.SSL_CLIENT_AUTHENTICATION_CONFIG + ); + } else { + log.warn( + "The configuration {} is deprecated and should be replaced with {}", + RestConfig.SSL_CLIENT_AUTH_CONFIG, + RestConfig.SSL_CLIENT_AUTHENTICATION_CONFIG + ); + clientAuthentication = config.getBoolean(RestConfig.SSL_CLIENT_AUTH_CONFIG) + ? RestConfig.SSL_CLIENT_AUTHENTICATION_REQUIRED + : RestConfig.SSL_CLIENT_AUTHENTICATION_NONE; + } + } + + switch (clientAuthentication) { + case RestConfig.SSL_CLIENT_AUTHENTICATION_REQUIRED: + sslContextFactory.setNeedClientAuth(true); + break; + case RestConfig.SSL_CLIENT_AUTHENTICATION_REQUESTED: + sslContextFactory.setWantClientAuth(true); + break; + case RestConfig.SSL_CLIENT_AUTHENTICATION_NONE: + break; + default: + throw new ConfigException( + "Unexpected value for {} configuration: {}", + RestConfig.SSL_CLIENT_AUTHENTICATION_CONFIG, + clientAuthentication + ); + } + } + public Handler wrapWithGzipHandler(Handler handler) { if (config.getBoolean(RestConfig.ENABLE_GZIP_COMPRESSION_CONFIG)) { GzipHandler gzip = new GzipHandler(); @@ -321,52 +504,89 @@ public Handler wrapWithGzipHandler(Handler handler) { return handler; } + /** + * Used to register any websocket endpoints that will live under the path configured via + * {@link io.confluent.rest.RestConfig#WEBSOCKET_PATH_PREFIX_CONFIG} + */ protected void registerWebSocketEndpoints(ServerContainer container) { } - static void setUnsecurePathConstraints( - ServletContextHandler context, - List unsecurePaths - ) { - //we need to set unsecure path only if there is an existing security handler. Otherwise all - // paths are by default unsecure - if (context.getSecurityHandler() != null && !unsecurePaths.isEmpty()) { - for (String path : unsecurePaths) { - Constraint constraint = new Constraint(); - constraint.setAuthenticate(false); - ConstraintMapping constraintMapping = new ConstraintMapping(); - constraintMapping.setConstraint(constraint); - constraintMapping.setMethod("*"); - constraintMapping.setPathSpec(path); - ((ConstraintSecurityHandler) context.getSecurityHandler()) - .addConstraintMapping(constraintMapping); - } + static boolean enableBasicAuth(String authMethod) { + return RestConfig.AUTHENTICATION_METHOD_BASIC.equals(authMethod); + } + + static boolean enableBearerAuth(String authMethod) { + return RestConfig.AUTHENTICATION_METHOD_BEARER.equals(authMethod); + } + + + protected LoginAuthenticator createAuthenticator() { + final String realm = getConfiguration().getString(RestConfig.AUTHENTICATION_REALM_CONFIG); + final String method = getConfiguration().getString(RestConfig.AUTHENTICATION_METHOD_CONFIG); + if (enableBasicAuth(method)) { + return new BasicAuthenticator(); + } else if (enableBearerAuth(method)) { + throw new UnsupportedOperationException( + "Must implement Application.createAuthenticator() when using '" + + RestConfig.AUTHENTICATION_METHOD_CONFIG + "=" + + RestConfig.AUTHENTICATION_METHOD_BEARER + "'." + ); } + return null; } + protected LoginService createLoginService() { + final String realm = getConfiguration().getString(RestConfig.AUTHENTICATION_REALM_CONFIG); + final String method = getConfiguration().getString(RestConfig.AUTHENTICATION_METHOD_CONFIG); + if (enableBasicAuth(method)) { + return new JAASLoginService(realm); + } else if (enableBearerAuth(method)) { + throw new UnsupportedOperationException( + "Must implement Application.createLoginService() when using '" + + RestConfig.AUTHENTICATION_METHOD_CONFIG + "=" + + RestConfig.AUTHENTICATION_METHOD_BEARER + "'." + ); + } + return null; + } - static boolean enableBasicAuth(String authMethod) { - return RestConfig.AUTHENTICATION_METHOD_BASIC.equals(authMethod); + protected IdentityService createIdentityService() { + final String method = getConfiguration().getString(RestConfig.AUTHENTICATION_METHOD_CONFIG); + if (enableBasicAuth(method) || enableBearerAuth(method)) { + return new DefaultIdentityService(); + } + return null; } - static ConstraintSecurityHandler createSecurityHandler(String realm, List roles) { + protected ConstraintSecurityHandler createBasicSecurityHandler() { + return createSecurityHandler(); + } + + protected ConstraintSecurityHandler createBearerSecurityHandler() { + return createSecurityHandler(); + } + + protected ConstraintSecurityHandler createSecurityHandler() { + final String realm = getConfiguration().getString(RestConfig.AUTHENTICATION_REALM_CONFIG); + final ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); - Constraint constraint = new Constraint(); - constraint.setAuthenticate(true); - constraint.setRoles(roles.toArray(new String[0])); - ConstraintMapping constraintMapping = new ConstraintMapping(); - constraintMapping.setConstraint(constraint); - constraintMapping.setMethod("*"); - constraintMapping.setPathSpec("/*"); - securityHandler.addConstraintMapping(constraintMapping); - securityHandler.setAuthenticator(new BasicAuthenticator()); - securityHandler.setLoginService(new JAASLoginService(realm)); - securityHandler.setIdentityService(new DefaultIdentityService()); + securityHandler.addConstraintMapping(createGlobalAuthConstraint()); + securityHandler.setAuthenticator(createAuthenticator()); + securityHandler.setLoginService(createLoginService()); + securityHandler.setIdentityService(createIdentityService()); securityHandler.setRealmName(realm); + + AuthUtil.createUnsecuredConstraints(config) + .forEach(securityHandler::addConstraintMapping); + return securityHandler; } + protected ConstraintMapping createGlobalAuthConstraint() { + return AuthUtil.createGlobalAuthConstraint(config); + } + // TODO: delete deprecatedPort parameter when `PORT_CONFIG` is deprecated. // It's only used to support the deprecated configuration. public static List parseListeners( @@ -432,6 +652,11 @@ public void configureBaseApplication(Configurable config) { configureBaseApplication(config, null); } + /** + * Register standard components for a JSON REST application on the given JAX-RS configurable, + * which can be either an ResourceConfig for a server or a ClientConfig for a Jersey-based REST + * client. + */ public void configureBaseApplication(Configurable config, Map metricTags) { T restConfig = getConfiguration(); @@ -443,8 +668,17 @@ public void configureBaseApplication(Configurable config, Map metricTags, restConfig.getTime())); config.property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true); + config.property(ServerProperties.WADL_FEATURE_DISABLE, true); } + /** + * Register a body provider and optional exception mapper for (de)serializing JSON in + * request/response entities. + * @param config The config to register the provider with + * @param restConfig The application's configuration + * @param registerExceptionMapper Whether or not to register an additional exception mapper for + * handling errors in (de)serialization + */ protected void registerJsonProvider( Configurable config, T restConfig, @@ -458,10 +692,20 @@ protected void registerJsonProvider( } } + /** + * Register server features + * @param config The config to register the features with + * @param restConfig The application's configuration + */ protected void registerFeatures(Configurable config, T restConfig) { config.register(ValidationFeature.class); } + /** + * Register handlers for translating exceptions into responses. + * @param config The config to register the mappers with + * @param restConfig The application's configuration + */ protected void registerExceptionMappers(Configurable config, T restConfig) { config.register(ConstraintViolationExceptionMapper.class); config.register(new WebApplicationExceptionMapper(restConfig)); @@ -472,10 +716,20 @@ public T getConfiguration() { return this.config; } + /** + * Gets a JSON ObjectMapper to use for (de)serialization of request/response entities. Override + * this to configure the behavior of the serializer. One simple example of customization is to + * set the INDENT_OUTPUT flag to make the output more readable. The default is a default + * Jackson ObjectMapper. + */ protected ObjectMapper getJsonMapper() { return new ObjectMapper(); } + /** + * Start the server (creating it if necessary). + * @throws Exception If the application fails to start + */ public void start() throws Exception { if (server == null) { createServer(); @@ -483,15 +737,41 @@ public void start() throws Exception { server.start(); } + /** + * Wait for the server to exit, allowing existing requests to complete if graceful shutdown is + * enabled and invoking the shutdown hook before returning. + * @throws InterruptedException If the internal threadpool fails to stop + */ public void join() throws InterruptedException { server.join(); shutdownLatch.await(); } + /** + * Request that the server shutdown. + * @throws Exception If the application fails to stop + */ public void stop() throws Exception { server.stop(); } + private void doShutdown() { + resourceExtensions.forEach(ext -> { + try { + ext.close(); + } catch (final IOException e) { + log.error("Error closing the extension resource. ext:" + ext, e); + } + }); + + onShutdown(); + } + + /** + * Shutdown hook that is invoked after the Jetty server has processed the shutdown request, + * stopped accepting new connections, and tried to gracefully finish existing requests. At this + * point it should be safe to clean up any resources used while processing requests. + */ public void onShutdown() { } } diff --git a/hadoop-unit-confluent/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/hadoop-unit-confluent/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java index 7b91cff6..589e0470 100644 --- a/hadoop-unit-confluent/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java +++ b/hadoop-unit-confluent/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.utils; import java.io.InputStream; +import java.lang.management.ManagementFactory; import java.util.Properties; import javax.management.JMException; +import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.kafka.common.MetricName; @@ -34,15 +36,17 @@ public class AppInfoParser { private static final String VERSION; private static final String COMMIT_ID; + protected static final String DEFAULT_VALUE = "unknown"; + static { Properties props = new Properties(); try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties")) { props.load(resourceStream); } catch (Exception e) { - log.warn("Error while loading kafka-version.properties :" + e.getMessage()); + log.warn("Error while loading kafka-version.properties: {}", e.getMessage()); } - VERSION = props.getProperty("version", "unknown").trim(); - COMMIT_ID = props.getProperty("commitId", "unknown").trim(); + VERSION = props.getProperty("version", DEFAULT_VALUE).trim(); + COMMIT_ID = props.getProperty("commitId", DEFAULT_VALUE).trim(); } public static String getVersion() { @@ -53,13 +57,13 @@ public static String getCommitId() { return COMMIT_ID; } - public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) { + public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics, long nowMs) { try { ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id)); - AppInfo mBean = new AppInfo(); + AppInfo mBean = new AppInfo(nowMs); // ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name); - registerMetrics(metrics); // prefix will be added later by JmxReporter + registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter } catch (JMException e) { log.warn("Error registering AppInfo mbean", e); } @@ -82,10 +86,11 @@ private static MetricName metricName(Metrics metrics, String name) { return metrics.metricName(name, "app-info", "Metric indicating " + name); } - private static void registerMetrics(Metrics metrics) { + private static void registerMetrics(Metrics metrics, AppInfo appInfo) { if (metrics != null) { - metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(VERSION)); - metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(COMMIT_ID)); + metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(appInfo.getVersion())); + metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(appInfo.getCommitId())); + metrics.addMetric(metricName(metrics, "start-time-ms"), new ImmutableValue<>(appInfo.getStartTimeMs())); } } @@ -93,19 +98,25 @@ private static void unregisterMetrics(Metrics metrics) { if (metrics != null) { metrics.removeMetric(metricName(metrics, "version")); metrics.removeMetric(metricName(metrics, "commit-id")); + metrics.removeMetric(metricName(metrics, "start-time-ms")); } } public interface AppInfoMBean { - public String getVersion(); - public String getCommitId(); + String getVersion(); + String getCommitId(); + Long getStartTimeMs(); } public static class AppInfo implements AppInfoMBean { - public AppInfo() { - log.info("Kafka version : " + AppInfoParser.getVersion()); - log.info("Kafka commitId : " + AppInfoParser.getCommitId()); + private final Long startTimeMs; + + public AppInfo(long startTimeMs) { + this.startTimeMs = startTimeMs; + log.info("Kafka version: {}", AppInfoParser.getVersion()); + log.info("Kafka commitId: {}", AppInfoParser.getCommitId()); + log.info("Kafka startTimeMs: {}", startTimeMs); } @Override @@ -118,6 +129,11 @@ public String getCommitId() { return AppInfoParser.getCommitId(); } + @Override + public Long getStartTimeMs() { + return startTimeMs; + } + } static class ImmutableValue implements Gauge { diff --git a/pom.xml b/pom.xml index 87009a3b..35ba648b 100755 --- a/pom.xml +++ b/pom.xml @@ -150,8 +150,8 @@ 0.12.0.2.6.5.0-292 1.0.0.2.6.5.0-292 - 5.2.1 - 2.2.0-cp1 + 5.3.0 + 5.3.0-ccs 2.2.0 3.2.1