diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh new file mode 100755 index 0000000000000..9dd9f16fd1b05 --- /dev/null +++ b/bin/kafka-features.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@" diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala new file mode 100644 index 0000000000000..9cc0a105ef0d2 --- /dev/null +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils +import java.util.Properties + +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ +import joptsimple.OptionSpec + +import scala.concurrent.ExecutionException + +object FeatureCommand { + + def main(args: Array[String]): Unit = { + val opts = new FeatureCommandOptions(args) + val featureApis = new FeatureApis(opts) + var exitCode = 0 + try { + featureApis.execute() + } catch { + case e: IllegalArgumentException => + printException(e) + opts.parser.printHelpOn(System.err) + exitCode = 1 + case _: UpdateFeaturesException => + exitCode = 1 + case e: ExecutionException => + val cause = if (e.getCause == null) e else e.getCause + printException(cause) + exitCode = 1 + case e: Throwable => + printException(e) + exitCode = 1 + } finally { + featureApis.close() + Exit.exit(exitCode) + } + } + + private def printException(exception: Throwable): Unit = { + System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception)) + } +} + +class UpdateFeaturesException(message: String) extends RuntimeException(message) + +/** + * A class that provides necessary APIs to bridge feature APIs provided by the the Admin client with + * the requirements of the CLI tool. + * + * @param opts the CLI options + */ +class FeatureApis(private var opts: FeatureCommandOptions) { + private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures + private var adminClient = FeatureApis.createAdminClient(opts) + + private def pad(op: String): String = { + f"$op%11s" + } + + private val addOp = pad("[Add]") + private val upgradeOp = pad("[Upgrade]") + private val deleteOp = pad("[Delete]") + private val downgradeOp = pad("[Downgrade]") + + // For testing only. + private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // For testing only. + private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = { + adminClient.close() + adminClient = FeatureApis.createAdminClient(newOpts) + opts = newOpts + } + + private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = { + val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController) + adminClient.describeFeatures(options).featureMetadata().get() + } + + /** + * Describes the supported and finalized features. If the --from-controller CLI option + * is provided, then the request is issued only to the controller, otherwise the request is issued + * to any of the provided bootstrap servers. + */ + def describeFeatures(): Unit = { + val result = describeFeatures(opts.hasFromControllerOption) + val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet + + features.toList.sorted.foreach { + feature => + val output = new StringBuilder() + output.append(s"Feature: $feature") + + val (supportedMinVersion, supportedMaxVersion) = { + val supportedVersionRange = result.supportedFeatures.get(feature) + if (supportedVersionRange == null) { + ("-", "-") + } else { + (supportedVersionRange.minVersion, supportedVersionRange.maxVersion) + } + } + output.append(s"\tSupportedMinVersion: $supportedMinVersion") + output.append(s"\tSupportedMaxVersion: $supportedMaxVersion") + + val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = { + val finalizedVersionRange = result.finalizedFeatures.get(feature) + if (finalizedVersionRange == null) { + ("-", "-") + } else { + (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel) + } + } + output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel") + output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel") + + val epoch = { + if (result.finalizedFeaturesEpoch.isPresent) { + result.finalizedFeaturesEpoch.get.toString + } else { + "-" + } + } + output.append(s"\tEpoch: $epoch") + + println(output) + } + } + + /** + * Upgrades all features known to this tool to their highest max version levels. The method may + * add new finalized features if they were not finalized previously, but it does not delete + * any existing finalized feature. The results of the feature updates are written to STDOUT. + * + * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature + * updates to STDOUT, without applying them. + * + * @throws UpdateFeaturesException if at least one of the feature updates failed + */ + def upgradeAllFeatures(): Unit = { + val metadata = describeFeatures(true) + val existingFinalizedFeatures = metadata.finalizedFeatures + val updates = supportedFeatures.features.asScala.map { + case (feature, targetVersionRange) => + val existingVersionRange = existingFinalizedFeatures.get(feature) + if (existingVersionRange == null) { + val updateStr = + addOp + + s"\tFeature: $feature" + + s"\tExistingFinalizedMaxVersion: -" + + s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}" + (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false)))) + } else { + if (targetVersionRange.max > existingVersionRange.maxVersionLevel) { + val updateStr = + upgradeOp + + s"\tFeature: $feature" + + s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" + + s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}" + (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false)))) + } else { + (feature, Option.empty) + } + } + }.filter { + case(_, updateInfo) => updateInfo.isDefined + }.map { + case(feature, updateInfo) => (feature, updateInfo.get) + }.toMap + + if (updates.nonEmpty) { + maybeApplyFeatureUpdates(updates) + } + } + + /** + * Downgrades existing finalized features to the highest max version levels known to this tool. + * The method may delete existing finalized features if they are no longer seen to be supported, + * but it does not add a feature that was not finalized previously. The results of the feature + * updates are written to STDOUT. + * + * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature + * updates to STDOUT, without applying them. + * + * @throws UpdateFeaturesException if at least one of the feature updates failed + */ + def downgradeAllFeatures(): Unit = { + val metadata = describeFeatures(true) + val existingFinalizedFeatures = metadata.finalizedFeatures + val supportedFeaturesMap = supportedFeatures.features + val updates = existingFinalizedFeatures.asScala.map { + case (feature, existingVersionRange) => + val targetVersionRange = supportedFeaturesMap.get(feature) + if (targetVersionRange == null) { + val updateStr = + deleteOp + + s"\tFeature: $feature" + + s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" + + s"\tNewFinalizedMaxVersion: -" + (feature, Some(updateStr, new FeatureUpdate(0, true))) + } else { + if (targetVersionRange.max < existingVersionRange.maxVersionLevel) { + val updateStr = + downgradeOp + + s"\tFeature: $feature" + + s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" + + s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}" + (feature, Some(updateStr, new FeatureUpdate(targetVersionRange.max, true))) + } else { + (feature, Option.empty) + } + } + }.filter { + case(_, updateInfo) => updateInfo.isDefined + }.map { + case(feature, updateInfo) => (feature, updateInfo.get) + }.toMap + + if (updates.nonEmpty) { + maybeApplyFeatureUpdates(updates) + } + } + + /** + * Applies the provided feature updates. If the --dry-run CLI option is provided, the method + * only prints the expected feature updates to STDOUT without applying them. + * + * @param updates the feature updates to be applied via the admin client + * + * @throws UpdateFeaturesException if at least one of the feature updates failed + */ + private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = { + if (opts.hasDryRunOption) { + println("Expected feature updates:" + ListMap( + updates + .toSeq + .sortBy { case(feature, _) => feature} :_*) + .map { case(_, (updateStr, _)) => updateStr} + .mkString("\n")) + } else { + val result = adminClient.updateFeatures( + updates + .map { case(feature, (_, update)) => (feature, update)} + .asJava, + new UpdateFeaturesOptions()) + val resultSortedByFeature = ListMap( + result + .values + .asScala + .toSeq + .sortBy { case(feature, _) => feature} :_*) + val failures = resultSortedByFeature.map { + case (feature, updateFuture) => + val (updateStr, _) = updates(feature) + try { + updateFuture.get + println(updateStr + "\tResult: OK") + 0 + } catch { + case e: ExecutionException => + val cause = if (e.getCause == null) e else e.getCause + println(updateStr + "\tResult: FAILED due to " + cause) + 1 + case e: Throwable => + println(updateStr + "\tResult: FAILED due to " + e) + 1 + } + }.sum + if (failures > 0) { + throw new UpdateFeaturesException(s"$failures feature updates failed!") + } + } + } + + def execute(): Unit = { + if (opts.hasDescribeOption) { + describeFeatures() + } else if (opts.hasUpgradeAllOption) { + upgradeAllFeatures() + } else if (opts.hasDowngradeAllOption) { + downgradeAllFeatures() + } else { + throw new IllegalStateException("Unexpected state: no CLI command could be executed.") + } + } + + def close(): Unit = { + adminClient.close() + } +} + +class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { + private val bootstrapServerOpt = parser.accepts( + "bootstrap-server", + "REQUIRED: A comma-separated list of host:port pairs to use for establishing the connection" + + " to the Kafka cluster.") + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) + private val commandConfigOpt = parser.accepts( + "command-config", + "Property file containing configs to be passed to Admin Client." + + " This is used with --bootstrap-server option when required.") + .withOptionalArg + .describedAs("command config property file") + .ofType(classOf[String]) + private val describeOpt = parser.accepts( + "describe", + "Describe supported and finalized features. By default, the features are described from a" + + " random broker. The request can be optionally directed only to the controller using the" + + " --from-controller option.") + private val fromControllerOpt = parser.accepts( + "from-controller", + "Describe supported and finalized features from the controller.") + private val upgradeAllOpt = parser.accepts( + "upgrade-all", + "Upgrades all finalized features to the maximum version levels known to the tool." + + " This command finalizes new features known to the tool that were never finalized" + + " previously in the cluster, but it is guaranteed to not delete any existing feature.") + private val downgradeAllOpt = parser.accepts( + "downgrade-all", + "Downgrades all finalized features to the maximum version levels known to the tool." + + " This command deletes unknown features from the list of finalized features in the" + + " cluster, but it is guaranteed to not add a new feature.") + private val dryRunOpt = parser.accepts( + "dry-run", + "Performs a dry-run of upgrade/downgrade mutations to finalized feature without applying them.") + + options = parser.parse(args : _*) + + checkArgs() + + def has(builder: OptionSpec[_]): Boolean = options.has(builder) + + def hasDescribeOption: Boolean = has(describeOpt) + + def hasFromControllerOption: Boolean = has(fromControllerOpt) + + def hasDryRunOption: Boolean = has(dryRunOpt) + + def hasUpgradeAllOption: Boolean = has(upgradeAllOpt) + + def hasDowngradeAllOption: Boolean = has(downgradeAllOpt) + + def commandConfig: Properties = { + if (has(commandConfigOpt)) + Utils.loadProps(options.valueOf(commandConfigOpt)) + else + new Properties() + } + + def bootstrapServers: String = options.valueOf(bootstrapServerOpt) + + def checkArgs(): Unit = { + CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool describes and updates finalized features.") + val numActions = Seq(describeOpt, upgradeAllOpt, downgradeAllOpt).count(has) + if (numActions != 1) { + CommandLineUtils.printUsageAndDie( + parser, + "Command must include exactly one action: --describe, --upgrade-all, --downgrade-all.") + } + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) + if (hasDryRunOption && !hasUpgradeAllOption && !hasDowngradeAllOption) { + CommandLineUtils.printUsageAndDie( + parser, + "Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.") + } + if (hasFromControllerOption && !hasDescribeOption) { + CommandLineUtils.printUsageAndDie( + parser, + "Command can contain --from-controller option only when --describe action is provided.") + } + } +} + +object FeatureApis { + private def createAdminClient(opts: FeatureCommandOptions): Admin = { + val props = new Properties() + props.putAll(opts.commandConfig) + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers) + Admin.create(props) + } +} diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala new file mode 100644 index 0000000000000..0b9f80d3cc3fa --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.api.KAFKA_2_7_IV0 +import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer} +import kafka.utils.TestUtils +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils + +import java.util.Properties + +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Test +import org.scalatest.Assertions.intercept + +class FeatureCommandTest extends BaseRequestTest { + override def brokerCount: Int = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { + props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private val defaultSupportedFeatures: Features[SupportedVersionRange] = + Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)), + Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)))) + + private def updateSupportedFeatures(features: Features[SupportedVersionRange], + targetServers: Set[KafkaServer]): Unit = { + targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) + }) + + // Wait until updates to all BrokerZNode supported features propagate to the controller. + val brokerIds = targetServers.map(s => s.config.brokerId) + waitUntilTrue( + () => servers.exists(s => { + if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers + .filter(b => brokerIds.contains(b.id)) + .forall(b => { + b.features.equals(features) + }) + } else { + false + } + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { + updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + /** + * Tests if the FeatureApis#describeFeatures API works as expected when describing features before and + * after upgrading features. + */ + @Test + def testDescribeFeaturesSuccess(): Unit = { + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures) + val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe", "--from-controller"))) + featureApis.setSupportedFeatures(defaultSupportedFeatures) + try { + val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures()) + val expectedInitialDescribeOutput = + "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" + + "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" + assertEquals(expectedInitialDescribeOutput, initialDescribeOutput) + featureApis.upgradeAllFeatures() + val finalDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures()) + val expectedFinalDescribeOutput = + "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 3\tEpoch: 1\n" + + "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 5\tEpoch: 1\n" + assertEquals(expectedFinalDescribeOutput, finalDescribeOutput) + } finally { + featureApis.close() + } + } + + /** + * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a success case. + */ + @Test + def testUpgradeAllFeaturesSuccess(): Unit = { + val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all")) + val featureApis = new FeatureApis(upgradeOpts) + try { + // Step (1): + // - Update the supported features across all brokers. + // - Upgrade non-existing feature_1 to maxVersionLevel: 2. + // - Verify results. + val initialSupportedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2)))) + updateSupportedFeaturesInAllBrokers(initialSupportedFeatures) + featureApis.setSupportedFeatures(initialSupportedFeatures) + var output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures()) + var expected = + " [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 2\tResult: OK\n" + assertEquals(expected, output) + + // Step (2): + // - Update the supported features across all brokers. + // - Upgrade existing feature_1 to maxVersionLevel: 3. + // - Upgrade non-existing feature_2 to maxVersionLevel: 5. + // - Verify results. + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures) + featureApis.setSupportedFeatures(defaultSupportedFeatures) + output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures()) + expected = + " [Upgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: 3\tResult: OK\n" + + " [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 5\tResult: OK\n" + assertEquals(expected, output) + + // Step (3): + // - Perform an upgrade of all features again. + // - Since supported features have not changed, expect that the above action does not yield + // any results. + output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures()) + assertTrue(output.isEmpty) + featureApis.setOptions(upgradeOpts) + output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures()) + assertTrue(output.isEmpty) + } finally { + featureApis.close() + } + } + + /** + * Tests if the FeatureApis#downgradeAllFeatures API works as expected during a success case. + */ + @Test + def testDowngradeFeaturesSuccess(): Unit = { + val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--downgrade-all")) + val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all")) + val featureApis = new FeatureApis(upgradeOpts) + try { + // Step (1): + // - Update the supported features across all brokers. + // - Upgrade non-existing feature_1 to maxVersionLevel: 3. + // - Upgrade non-existing feature_2 to maxVersionLevel: 5. + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures) + featureApis.setSupportedFeatures(defaultSupportedFeatures) + featureApis.upgradeAllFeatures() + + // Step (2): + // - Downgrade existing feature_1 to maxVersionLevel: 2. + // - Delete feature_2 since it is no longer supported by the FeatureApis object. + // - Verify results. + val downgradedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2)))) + featureApis.setSupportedFeatures(downgradedFeatures) + featureApis.setOptions(downgradeOpts) + var output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures()) + var expected = + "[Downgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 3\tNewFinalizedMaxVersion: 2\tResult: OK\n" + + " [Delete]\tFeature: feature_2\tExistingFinalizedMaxVersion: 5\tNewFinalizedMaxVersion: -\tResult: OK\n" + assertEquals(expected, output) + + // Step (3): + // - Perform a downgrade of all features again. + // - Since supported features have not changed, expect that the above action does not yield + // any results. + updateSupportedFeaturesInAllBrokers(downgradedFeatures) + output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures()) + assertTrue(output.isEmpty) + + // Step (4): + // - Delete feature_1 since it is no longer supported by the FeatureApis object. + // - Verify results. + featureApis.setSupportedFeatures(Features.emptySupportedFeatures()) + output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures()) + expected = + " [Delete]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: -\tResult: OK\n" + assertEquals(expected, output) + } finally { + featureApis.close() + } + } + + /** + * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a partial failure case. + */ + @Test + def testUpgradeFeaturesFailure(): Unit = { + val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all")) + val featureApis = new FeatureApis(upgradeOpts) + try { + // Step (1): Update the supported features across all brokers. + updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures) + + // Step (2): + // - Intentionally setup the FeatureApis object such that it contains incompatible target + // features (viz. feature_2 and feature_3). + // - Upgrade non-existing feature_1 to maxVersionLevel: 4. Expect the operation to fail with + // an incompatibility failure. + // - Upgrade non-existing feature_2 to maxVersionLevel: 5. Expect the operation to succeed. + // - Upgrade non-existing feature_3 to maxVersionLevel: 3. Expect the operation to fail + // since the feature is not supported. + val targetFeaturesWithIncompatibilities = + Features.supportedFeatures( + Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 4)), + Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)), + Utils.mkEntry("feature_3", new SupportedVersionRange(1, 3)))) + featureApis.setSupportedFeatures(targetFeaturesWithIncompatibilities) + val output = TestUtils.grabConsoleOutput({ + val exception = intercept[UpdateFeaturesException] { + featureApis.upgradeAllFeatures() + } + assertEquals("2 feature updates failed!", exception.getMessage) + }) + val expected = + " [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -" + + "\tNewFinalizedMaxVersion: 4\tResult: FAILED due to" + + " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" + + " feature update because brokers were found to have incompatible versions for the" + + " feature.\n" + + " [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -" + + "\tNewFinalizedMaxVersion: 5\tResult: OK\n" + + " [Add]\tFeature: feature_3\tExistingFinalizedMaxVersion: -" + + "\tNewFinalizedMaxVersion: 3\tResult: FAILED due to" + + " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" + + " feature update because the provided feature is not supported.\n" + assertEquals(expected, output) + } finally { + featureApis.close() + } + } +}