Skip to content

Commit

Permalink
KAFKA-10028: Implement write path for feature versioning system (KIP-…
Browse files Browse the repository at this point in the history
…584) (#9001)

Summary:
In this PR, I have implemented the write path of the feature versioning system (KIP-584). Here is a summary of what's in this PR:

New APIs in org.apache.kafka.clients.admin.Admin interface, and their client and server implementations. These APIs can be used to describe features and update finalized features. These APIs are: Admin#describeFeatures and Admin#updateFeatures.
The write path is provided by the Admin#updateFeatures API. The corresponding server-side implementation is provided in KafkaApis and KafkaController classes. This can be a good place to start the code review.
The write path is supplemented by Admin#describeFeatures client API. This does not translate 1:1 to a server-side API. Instead, under the hood the API makes an explicit ApiVersionsRequest to the Broker to fetch the supported and finalized features.
Implemented a suite of integration tests in UpdateFeaturesTest.scala that thoroughly exercises the various cases in the write path.

Other changes:

The data type of the FinalizedFeaturesEpoch field in ApiVersionsResponse has been modified from int32 to int64. This change is to conform with the latest changes to the KIP explained in the voting thread.
Along the way, the class SupportedFeatures has been renamed to be called BrokerFeatures, and, it now holds both supported features as well as default minimum version levels.
For the purpose of testing, both the BrokerFeatures and FinalizedFeatureCache classes have been changed to be no longer singleton in implementation. Instead, these are now instantiated once and maintained in KafkaServer. The singleton instances are passed around to various classes, as needed.

Reviewers: Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
  • Loading branch information
kowshik authored Oct 7, 2020
1 parent 4e65030 commit fb4f297
Show file tree
Hide file tree
Showing 46 changed files with 3,005 additions and 359 deletions.
68 changes: 68 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;
Expand Down Expand Up @@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);

/**
* Describes finalized as well as supported features. By default, the request is issued to any
* broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
* parameter. This is particularly useful if the user requires strongly consistent reads of
* finalized features.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeFeaturesResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
* Applies specified updates to finalized features. This operation is not transactional so some
* updates may succeed while the rest may fail.
* <p>
* The API takes in a map of finalized feature names to {@link FeatureUpdate} that needs to be
* applied. Each entry in the map specifies the finalized feature to be added or updated or
* deleted, along with the new max feature version level value. This request is issued only to
* the controller since the API is only served by the controller. The return value contains an
* error code for each supplied {@link FeatureUpdate}, and the code indicates if the update
* succeeded or failed in the controller.
* <ul>
* <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
* in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set. Setting this
* flag conveys user intent to attempt downgrade of a feature max version level. Note that
* despite the allowDowngrade flag being set, certain downgrades may be rejected by the
* controller if it is deemed impossible.</li>
* <li>Deletion of a finalized feature version is not a regular operation/intent. It could be
* done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting
* the max version level to a value less than 1.</li>
* </ul>
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the futures
* obtained from the returned {@link UpdateFeaturesResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* If the request details are invalid. e.g., a non-existing finalized feature is attempted
* to be deleted or downgraded.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the updates could finish. It cannot be guaranteed whether
* the updates succeeded or not.</li>
* <li>{@link FeatureUpdateFailedException}
* This means there was an unexpected error encountered when the update was applied on
* the controller. There is no guarantee on whether the update succeeded or failed. The best
* way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
* request to the controller to get the latest features.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
*
* The API of this class is evolving. See {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {

/**
* - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
* issued only to the controller.
* - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
* issued to any random broker.
*/
private boolean sendRequestToController = false;

/**
* Sets a flag indicating that the describe features request must be issued only to the controller.
*/
public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController) {
this.sendRequestToController = sendRequestToController;
return this;
}

public boolean sendRequestToController() {
return sendRequestToController;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;

/**
* The result of the {@link Admin#describeFeatures(DescribeFeaturesOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class DescribeFeaturesResult {

private final KafkaFuture<FeatureMetadata> future;

DescribeFeaturesResult(KafkaFuture<FeatureMetadata> future) {
this.future = future;
}

public KafkaFuture<FeatureMetadata> featureMetadata() {
return future;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import static java.util.stream.Collectors.joining;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* Encapsulates details about finalized as well as supported features. This is particularly useful
* to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.
*/
public class FeatureMetadata {

private final Map<String, FinalizedVersionRange> finalizedFeatures;

private final Optional<Long> finalizedFeaturesEpoch;

private final Map<String, SupportedVersionRange> supportedFeatures;

FeatureMetadata(final Map<String, FinalizedVersionRange> finalizedFeatures,
final Optional<Long> finalizedFeaturesEpoch,
final Map<String, SupportedVersionRange> supportedFeatures) {
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.supportedFeatures = new HashMap<>(supportedFeatures);
}

/**
* Returns a map of finalized feature versions. Each entry in the map contains a key being a
* feature name and the value being a range of version levels supported by every broker in the
* cluster.
*/
public Map<String, FinalizedVersionRange> finalizedFeatures() {
return new HashMap<>(finalizedFeatures);
}

/**
* The epoch for the finalized features.
* If the returned value is empty, it means the finalized features are absent/unavailable.
*/
public Optional<Long> finalizedFeaturesEpoch() {
return finalizedFeaturesEpoch;
}

/**
* Returns a map of supported feature versions. Each entry in the map contains a key being a
* feature name and the value being a range of versions supported by a particular broker in the
* cluster.
*/
public Map<String, SupportedVersionRange> supportedFeatures() {
return new HashMap<>(supportedFeatures);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof FeatureMetadata)) {
return false;
}

final FeatureMetadata that = (FeatureMetadata) other;
return Objects.equals(this.finalizedFeatures, that.finalizedFeatures) &&
Objects.equals(this.finalizedFeaturesEpoch, that.finalizedFeaturesEpoch) &&
Objects.equals(this.supportedFeatures, that.supportedFeatures);
}

@Override
public int hashCode() {
return Objects.hash(finalizedFeatures, finalizedFeaturesEpoch, supportedFeatures);
}

private static <ValueType> String mapToString(final Map<String, ValueType> featureVersionsMap) {
return String.format(
"{%s}",
featureVersionsMap
.entrySet()
.stream()
.map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
.collect(joining(", "))
);
}

@Override
public String toString() {
return String.format(
"FeatureMetadata{finalizedFeatures:%s, finalizedFeaturesEpoch:%s, supportedFeatures:%s}",
mapToString(finalizedFeatures),
finalizedFeaturesEpoch.map(Object::toString).orElse("<none>"),
mapToString(supportedFeatures));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import java.util.Objects;

/**
* Encapsulates details about an update to a finalized feature.
*/
public class FeatureUpdate {
private final short maxVersionLevel;
private final boolean allowDowngrade;

/**
* @param maxVersionLevel the new maximum version level for the finalized feature.
* a value < 1 is special and indicates that the update is intended to
* delete the finalized feature, and should be accompanied by setting
* the allowDowngrade flag to true.
* @param allowDowngrade - true, if this feature update was meant to downgrade the existing
* maximum version level of the finalized feature.
* - false, otherwise.
*/
public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade) {
if (maxVersionLevel < 1 && !allowDowngrade) {
throw new IllegalArgumentException(String.format(
"The allowDowngrade flag should be set when the provided maxVersionLevel:%d is < 1.",
maxVersionLevel));
}
this.maxVersionLevel = maxVersionLevel;
this.allowDowngrade = allowDowngrade;
}

public short maxVersionLevel() {
return maxVersionLevel;
}

public boolean allowDowngrade() {
return allowDowngrade;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (!(other instanceof FeatureUpdate)) {
return false;
}

final FeatureUpdate that = (FeatureUpdate) other;
return this.maxVersionLevel == that.maxVersionLevel && this.allowDowngrade == that.allowDowngrade;
}

@Override
public int hashCode() {
return Objects.hash(maxVersionLevel, allowDowngrade);
}

@Override
public String toString() {
return String.format("FeatureUpdate{maxVersionLevel:%d, allowDowngrade:%s}", maxVersionLevel, allowDowngrade);
}
}
Loading

0 comments on commit fb4f297

Please sign in to comment.