---
.../org/apache/kafka/streams/processor/ProcessorSupplier.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
index b81b8b93e2f36..a3e5f30c2c464 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
@@ -26,7 +26,7 @@
* It is used in {@link Topology} for adding new processor operators, whose generated
* topology can then be replicated (and thus creating one or more {@link Processor} instances)
* and distributed to multiple stream threads.
- *
+ *
* The supplier should always generate a new instance each time {@link ProcessorSupplier#get()} gets called. Creating
* a single {@link Processor} object and returning the same object reference in {@link ProcessorSupplier#get()} would be
* a violation of the supplier pattern and leads to runtime exceptions.
From 698319b8e2c1f6cb574f339eede6f2a5b1919b55 Mon Sep 17 00:00:00 2001
From: Jason Gustafson
Date: Thu, 4 Feb 2021 10:04:17 -0800
Subject: [PATCH 016/243] KAFKA-12278; Ensure exposed api versions are
consistent within listener (#10666)
Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true. We now have more complex requirements for
API accessibility.
For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc. Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.
All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above. This PR introduces a
new "listeners" field to the request schema definitions. This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller". ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).
This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type. Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.
Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.
One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners. This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.
Reviewers: Colin P. McCabe , Ismael Juma
---
checkstyle/import-control.xml | 2 +
.../apache/kafka/clients/NodeApiVersions.java | 10 +-
.../kafka/common/network/ChannelBuilders.java | 16 +-
.../common/network/SaslChannelBuilder.java | 12 +-
.../apache/kafka/common/protocol/ApiKeys.java | 71 ++++---
.../kafka/common/protocol/Protocol.java | 2 +-
.../common/requests/ApiVersionsResponse.java | 89 ++++++---
.../SaslServerAuthenticator.java | 13 +-
.../message/AddOffsetsToTxnRequest.json | 1 +
.../message/AddPartitionsToTxnRequest.json | 1 +
.../message/AlterClientQuotasRequest.json | 1 +
.../common/message/AlterConfigsRequest.json | 1 +
.../common/message/AlterIsrRequest.json | 1 +
.../AlterPartitionReassignmentsRequest.json | 1 +
.../message/AlterReplicaLogDirsRequest.json | 1 +
.../AlterUserScramCredentialsRequest.json | 1 +
.../common/message/ApiVersionsRequest.json | 1 +
.../message/BeginQuorumEpochRequest.json | 1 +
.../message/BrokerHeartbeatRequest.json | 1 +
.../message/BrokerRegistrationRequest.json | 1 +
.../message/ControlledShutdownRequest.json | 1 +
.../common/message/CreateAclsRequest.json | 1 +
.../message/CreateDelegationTokenRequest.json | 1 +
.../message/CreatePartitionsRequest.json | 1 +
.../common/message/CreateTopicsRequest.json | 1 +
.../common/message/DeleteAclsRequest.json | 1 +
.../common/message/DeleteGroupsRequest.json | 1 +
.../common/message/DeleteRecordsRequest.json | 1 +
.../common/message/DeleteTopicsRequest.json | 1 +
.../common/message/DescribeAclsRequest.json | 1 +
.../message/DescribeClientQuotasRequest.json | 1 +
.../message/DescribeClusterRequest.json | 1 +
.../message/DescribeConfigsRequest.json | 1 +
.../DescribeDelegationTokenRequest.json | 1 +
.../common/message/DescribeGroupsRequest.json | 1 +
.../message/DescribeLogDirsRequest.json | 1 +
.../message/DescribeProducersRequest.json | 1 +
.../common/message/DescribeQuorumRequest.json | 1 +
.../DescribeUserScramCredentialsRequest.json | 1 +
.../common/message/ElectLeadersRequest.json | 1 +
.../common/message/EndQuorumEpochRequest.json | 1 +
.../common/message/EndTxnRequest.json | 1 +
.../common/message/EnvelopeRequest.json | 1 +
.../message/ExpireDelegationTokenRequest.json | 1 +
.../common/message/FetchRequest.json | 1 +
.../common/message/FetchSnapshotRequest.json | 1 +
.../message/FindCoordinatorRequest.json | 1 +
.../common/message/HeartbeatRequest.json | 1 +
.../IncrementalAlterConfigsRequest.json | 1 +
.../common/message/InitProducerIdRequest.json | 1 +
.../common/message/JoinGroupRequest.json | 1 +
.../common/message/LeaderAndIsrRequest.json | 1 +
.../common/message/LeaveGroupRequest.json | 1 +
.../common/message/ListGroupsRequest.json | 1 +
.../common/message/ListOffsetsRequest.json | 1 +
.../ListPartitionReassignmentsRequest.json | 1 +
.../common/message/MetadataRequest.json | 1 +
.../common/message/OffsetCommitRequest.json | 1 +
.../common/message/OffsetDeleteRequest.json | 1 +
.../common/message/OffsetFetchRequest.json | 1 +
.../message/OffsetForLeaderEpochRequest.json | 1 +
.../common/message/ProduceRequest.json | 1 +
.../message/RenewDelegationTokenRequest.json | 1 +
.../message/SaslAuthenticateRequest.json | 1 +
.../common/message/SaslHandshakeRequest.json | 1 +
.../common/message/StopReplicaRequest.json | 1 +
.../common/message/SyncGroupRequest.json | 1 +
.../message/TxnOffsetCommitRequest.json | 1 +
.../message/UnregisterBrokerRequest.json | 1 +
.../common/message/UpdateFeaturesRequest.json | 1 +
.../common/message/UpdateMetadataRequest.json | 1 +
.../resources/common/message/VoteRequest.json | 1 +
.../message/WriteTxnMarkersRequest.json | 1 +
.../kafka/clients/NetworkClientTest.java | 17 +-
.../kafka/clients/NodeApiVersionsTest.java | 36 ++--
.../clients/admin/KafkaAdminClientTest.java | 14 +-
.../consumer/internals/FetcherTest.java | 6 +-
.../producer/internals/SenderTest.java | 27 +--
.../kafka/common/network/NioEchoServer.java | 5 +-
.../network/SaslChannelBuilderTest.java | 14 +-
.../common/network/SslTransportLayerTest.java | 24 ++-
.../kafka/common/protocol/ApiKeysTest.java | 22 ++-
.../requests/ApiVersionsResponseTest.java | 51 ++---
.../common/requests/RequestResponseTest.java | 100 +++++-----
.../authenticator/SaslAuthenticatorTest.java | 177 ++++++++----------
.../SaslServerAuthenticatorTest.java | 8 +-
.../src/main/scala/kafka/api/ApiVersion.scala | 71 ++++---
.../scala/kafka/network/RequestChannel.scala | 17 +-
.../scala/kafka/network/SocketServer.scala | 36 ++--
.../kafka/server/ApiVersionManager.scala | 126 +++++++++++++
.../scala/kafka/server/BrokerServer.scala | 51 +++--
.../scala/kafka/server/ControllerServer.scala | 8 +-
.../main/scala/kafka/server/KafkaApis.scala | 58 ++----
.../scala/kafka/server/KafkaRaftServer.scala | 15 +-
.../main/scala/kafka/server/KafkaServer.scala | 40 ++--
.../kafka/tools/TestRaftRequestHandler.scala | 8 +-
.../scala/kafka/tools/TestRaftServer.scala | 9 +-
.../admin/BrokerApiVersionsCommandTest.scala | 2 +-
.../server/GssapiAuthenticationTest.scala | 5 +-
.../scala/unit/kafka/api/ApiVersionTest.scala | 23 ++-
.../unit/kafka/network/SocketServerTest.scala | 61 +++---
.../AbstractApiVersionsRequestTest.scala | 15 +-
.../kafka/server/ApiVersionManagerTest.scala | 115 ++++++++++++
.../kafka/server/ApiVersionsRequestTest.scala | 8 +-
.../kafka/server/ForwardingManagerTest.scala | 3 +-
.../unit/kafka/server/KafkaApisTest.scala | 85 ++-------
.../unit/kafka/server/RequestQuotaTest.scala | 6 +-
.../server/SaslApiVersionsRequestTest.scala | 4 +-
.../message/ApiMessageTypeGenerator.java | 69 ++++++-
.../kafka/message/MessageGenerator.java | 6 +-
.../org/apache/kafka/message/MessageSpec.java | 16 +-
.../kafka/message/RequestListenerType.java | 30 +++
.../metadata/MetadataRequestBenchmark.java | 8 +-
113 files changed, 1091 insertions(+), 585 deletions(-)
create mode 100644 core/src/main/scala/kafka/server/ApiVersionManager.scala
create mode 100644 core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
create mode 100644 generator/src/main/java/org/apache/kafka/message/RequestListenerType.java
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9ec16b9b7c096..aad58b0ddd8cd 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -50,6 +50,7 @@
+
@@ -108,6 +109,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 658d481308ead..3c09f0eb4e781 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
@@ -27,7 +24,10 @@
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.EnumMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -62,7 +62,7 @@ public static NodeApiVersions create() {
*/
public static NodeApiVersions create(Collection overrides) {
List apiVersions = new LinkedList<>(overrides);
- for (ApiKeys apiKey : ApiKeys.brokerApis()) {
+ for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
@@ -170,7 +170,7 @@ public String toString(boolean lineBreaks) {
// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
- for (ApiKeys apiKey : ApiKeys.brokerApis()) {
+ for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index ee5ed75828920..b4a1ce87cf1cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
@@ -40,6 +41,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.function.Supplier;
public class ChannelBuilders {
private static final Logger log = LoggerFactory.getLogger(ChannelBuilders.class);
@@ -77,7 +79,7 @@ public static ChannelBuilder clientChannelBuilder(
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
- saslHandshakeRequestEnable, null, null, time, logContext);
+ saslHandshakeRequestEnable, null, null, time, logContext, null);
}
/**
@@ -89,6 +91,7 @@ public static ChannelBuilder clientChannelBuilder(
* @param tokenCache Delegation token cache
* @param time the time instance
* @param logContext the log context instance
+ * @param apiVersionSupplier supplier for ApiVersions responses sent prior to authentication
*
* @return the configured `ChannelBuilder`
*/
@@ -99,10 +102,11 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
- LogContext logContext) {
+ LogContext logContext,
+ Supplier apiVersionSupplier) {
return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
isInterBrokerListener, null, true, credentialCache,
- tokenCache, time, logContext);
+ tokenCache, time, logContext, apiVersionSupplier);
}
private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -116,7 +120,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
- LogContext logContext) {
+ LogContext logContext,
+ Supplier apiVersionSupplier) {
Map configs = channelBuilderConfigs(config, listenerName);
ChannelBuilder channelBuilder;
@@ -174,7 +179,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
tokenCache,
sslClientAuthOverride,
time,
- logContext);
+ logContext,
+ apiVersionSupplier);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 900162614d998..17988db87a650 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
@@ -85,6 +86,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final DelegationTokenCache tokenCache;
private final Map loginManagers;
private final Map subjects;
+ private final Supplier apiVersionSupplier;
private SslFactory sslFactory;
private Map configs;
@@ -108,7 +110,8 @@ public SaslChannelBuilder(Mode mode,
DelegationTokenCache tokenCache,
String sslClientAuthOverride,
Time time,
- LogContext logContext) {
+ LogContext logContext,
+ Supplier apiVersionSupplier) {
this.mode = mode;
this.jaasContexts = jaasContexts;
this.loginManagers = new HashMap<>(jaasContexts.size());
@@ -126,6 +129,11 @@ public SaslChannelBuilder(Mode mode,
this.time = time;
this.logContext = logContext;
this.log = logContext.logger(getClass());
+ this.apiVersionSupplier = apiVersionSupplier;
+
+ if (mode == Mode.SERVER && apiVersionSupplier == null) {
+ throw new IllegalArgumentException("Server channel builder must provide an ApiVersionResponse supplier");
+ }
}
@SuppressWarnings("unchecked")
@@ -266,7 +274,7 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config
ChannelMetadataRegistry metadataRegistry) {
return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects,
kerberosShortNamer, listenerName, securityProtocol, transportLayer,
- connectionsMaxReauthMsByMechanism, metadataRegistry, time);
+ connectionsMaxReauthMsByMechanism, metadataRegistry, time, apiVersionSupplier);
}
// Visible to override for testing
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 49a6130a30079..475fc84c7355c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -21,7 +21,10 @@
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,19 +93,28 @@ public enum ApiKeys {
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
- VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
- BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
- END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
- DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true),
+ VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false),
+ BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
+ END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
+ DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
- ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
- FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true),
+ ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
+ FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
- BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
- BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),
- UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);
+ BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false),
+ BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
+ UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true);
+
+ private static final Map> APIS_BY_LISTENER =
+ new EnumMap<>(ApiMessageType.ListenerType.class);
+
+ static {
+ for (ApiMessageType.ListenerType listenerType : ApiMessageType.ListenerType.values()) {
+ APIS_BY_LISTENER.put(listenerType, filterApisForListener(listenerType));
+ }
+ }
// The generator ensures every `ApiMessageType` has a unique id
private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values())
@@ -120,9 +132,6 @@ public enum ApiKeys {
/** indicates the minimum required inter broker magic required to support the API */
public final byte minRequiredInterBrokerMagic;
- /** indicates whether this is an API which is only exposed by the KIP-500 controller **/
- public final boolean isControllerOnlyApi;
-
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
@@ -142,24 +151,17 @@ public enum ApiKeys {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, forwardable);
}
- ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
- this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false);
- }
-
ApiKeys(
ApiMessageType messageType,
boolean clusterAction,
byte minRequiredInterBrokerMagic,
- boolean forwardable,
- boolean isControllerOnlyApi
+ boolean forwardable
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
- this.isControllerOnlyApi = isControllerOnlyApi;
-
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
}
@@ -195,6 +197,14 @@ public short oldestVersion() {
return messageType.lowestSupportedVersion();
}
+ public List allVersions() {
+ List versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
+ for (short version = oldestVersion(); version < latestVersion(); version++) {
+ versions.add(version);
+ }
+ return versions;
+ }
+
public boolean isVersionSupported(short apiVersion) {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
}
@@ -207,6 +217,10 @@ public short responseHeaderVersion(short apiVersion) {
return messageType.responseHeaderVersion(apiVersion);
}
+ public boolean inScope(ApiMessageType.ListenerType listener) {
+ return messageType.listeners().contains(listener);
+ }
+
private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("\n");
@@ -214,7 +228,7 @@ private static String toHtml() {
b.append("Name | \n");
b.append("Key | \n");
b.append("");
- for (ApiKeys key : ApiKeys.brokerApis()) {
+ for (ApiKeys key : zkBrokerApis()) {
b.append("\n");
b.append("");
b.append("" + key.name + "");
@@ -246,10 +260,19 @@ public void visit(Type field) {
return hasBuffer.get();
}
- public static List brokerApis() {
- return Arrays.stream(values())
- .filter(api -> !api.isControllerOnlyApi)
+ public static EnumSet zkBrokerApis() {
+ return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
+ }
+
+ public static EnumSet apisForListener(ApiMessageType.ListenerType listener) {
+ return APIS_BY_LISTENER.get(listener);
+ }
+
+ private static EnumSet filterApisForListener(ApiMessageType.ListenerType listener) {
+ List controllerApis = Arrays.stream(ApiKeys.values())
+ .filter(apiKey -> apiKey.messageType.listeners().contains(listener))
.collect(Collectors.toList());
+ return EnumSet.copyOf(controllerApis);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index f31c613a8c51b..d455b26eb2d87 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -133,7 +133,7 @@ public static String toHtml() {
b.append("\n");
schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
}
- for (ApiKeys key : ApiKeys.brokerApis()) {
+ for (ApiKeys key : ApiKeys.zkBrokerApis()) {
// Key
b.append(" |