diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index d732cd22849cb..e3f3e48891b3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; @@ -252,41 +251,6 @@ default DeleteTopicsResult deleteTopics(Collection topics) { * @return The DeleteTopicsResult. */ DeleteTopicsResult deleteTopics(Collection topics, DeleteTopicsOptions options); - - /** - * This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)} - * with default options. See the overload for more details. - *

- * This operation is supported by brokers with version 2.8.0 or higher. - * - * @param topics The topic IDs for the topics to delete. - * @return The DeleteTopicsWithIdsResult. - */ - default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics) { - return deleteTopicsWithIds(topics, new DeleteTopicsOptions()); - } - - /** - * Delete a batch of topics. - *

- * This operation is not transactional so it may succeed for some topics while fail for others. - *

- * It may take several seconds after the {@link DeleteTopicsWithIdsResult} returns - * success for all the brokers to become aware that the topics are gone. - * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)} - * may continue to return information about the deleted topics. - *

- * If delete.topic.enable is false on the brokers, deleteTopicsWithIds will mark - * the topics for deletion, but not actually delete them. The futures will - * return successfully in this case. - *

- * This operation is supported by brokers with version 2.8.0 or higher. - * - * @param topics The topic IDs for the topics to delete. - * @param options The options to use when deleting the topics. - * @return The DeleteTopicsWithIdsResult. - */ - DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics, DeleteTopicsOptions options); /** * List the topics available in the cluster with the default options. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java deleted file mode 100644 index eeb91194a96ff..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.Collection; -import java.util.Map; - -/** - * The result of the {@link Admin#deleteTopicsWithIds(Collection)} call. - * - * The API of this class is evolving, see {@link Admin} for details. - */ -@InterfaceStability.Evolving -public class DeleteTopicsWithIdsResult { - final Map> futures; - - DeleteTopicsWithIdsResult(Map> futures) { - this.futures = futures; - } - - /** - * Return a map from topic IDs to futures which can be used to check the status of - * individual deletions. - */ - public Map> values() { - return futures; - } - - /** - * Return a future which succeeds only if all the topic deletions succeed. - */ - public KafkaFuture all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ab404700980ed..b2bfac311e70c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -49,7 +49,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; @@ -68,7 +67,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnacceptableCredentialException; import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -110,7 +108,6 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult; import org.apache.kafka.common.message.DeleteTopicsRequestData; -import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeClusterRequestData; import org.apache.kafka.common.message.DescribeConfigsRequestData; @@ -1631,32 +1628,6 @@ public DeleteTopicsResult deleteTopics(final Collection topicNames, return new DeleteTopicsResult(new HashMap<>(topicFutures)); } - @Override - public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection topicIds, - final DeleteTopicsOptions options) { - final Map> topicFutures = new HashMap<>(topicIds.size()); - final List validTopicIds = new ArrayList<>(topicIds.size()); - for (Uuid topicId : topicIds) { - if (topicId.equals(Uuid.ZERO_UUID)) { - KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.completeExceptionally(new UnknownTopicIdException("The given topic ID '" + - topicId + "' cannot be represented in a request.")); - topicFutures.put(topicId, future); - } else if (!topicFutures.containsKey(topicId)) { - topicFutures.put(topicId, new KafkaFutureImpl<>()); - validTopicIds.add(topicId); - } - } - if (!validTopicIds.isEmpty()) { - final long now = time.milliseconds(); - final long deadline = calcDeadlineMs(now, options.timeoutMs()); - final Call call = getDeleteTopicsWithIdsCall(options, topicFutures, validTopicIds, - Collections.emptyMap(), now, deadline); - runnable.call(call, now); - } - return new DeleteTopicsWithIdsResult(new HashMap<>(topicFutures)); - } - private Call getDeleteTopicsCall(final DeleteTopicsOptions options, final Map> futures, final List topics, @@ -1728,79 +1699,6 @@ void handleFailure(Throwable throwable) { } }; } - - private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options, - final Map> futures, - final List topicIds, - final Map quotaExceededExceptions, - final long now, - final long deadline) { - return new Call("deleteTopics", deadline, new ControllerNodeProvider()) { - @Override - DeleteTopicsRequest.Builder createRequest(int timeoutMs) { - return new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopics(topicIds.stream().map( - topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) - .setTimeoutMs(timeoutMs)); - } - - @Override - void handleResponse(AbstractResponse abstractResponse) { - // Check for controller change - handleNotControllerError(abstractResponse); - // Handle server responses for particular topics. - final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; - final List retryTopics = new ArrayList<>(); - final Map retryTopicQuotaExceededExceptions = new HashMap<>(); - for (DeletableTopicResult result : response.data().responses()) { - KafkaFutureImpl future = futures.get(result.topicId()); - if (future == null) { - log.warn("Server response mentioned unknown topic ID {}", result.topicId()); - } else { - ApiError error = new ApiError(result.errorCode(), result.errorMessage()); - if (error.isFailure()) { - if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { - ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback()); - if (options.shouldRetryOnQuotaViolation()) { - retryTopics.add(result.topicId()); - retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException); - } else { - future.completeExceptionally(quotaExceededException); - } - } else { - future.completeExceptionally(error.exception()); - } - } else { - future.complete(null); - } - } - } - // If there are topics to retry, retry them; complete unrealized futures otherwise. - if (retryTopics.isEmpty()) { - // The server should send back a response for every topic. But do a sanity check anyway. - completeUnrealizedFutures(futures.entrySet().stream(), - topic -> "The controller response did not contain a result for topic " + topic); - } else { - final long now = time.milliseconds(); - final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics, - retryTopicQuotaExceededExceptions, now, deadline); - runnable.call(call, now); - } - } - - @Override - void handleFailure(Throwable throwable) { - // If there were any topics retries due to a quota exceeded exception, we propagate - // the initial error back to the caller if the request timed out. - maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), - throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); - // Fail all the other remaining futures - completeAllExceptionally(futures.values(), throwable); - } - }; - } @Override public ListTopicsResult listTopics(final ListTopicsOptions options) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index a5296dae04f5b..ddd01be8bca0a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -65,7 +65,6 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.feature.Features; @@ -413,12 +412,6 @@ public static DeletableTopicResult deletableTopicResult(String topicName, Errors .setErrorCode(error.code()); } - public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Errors error) { - return new DeletableTopicResult() - .setTopicId(topicId) - .setErrorCode(error.code()); - } - public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) { CreatePartitionsResponseData data = new CreatePartitionsResponseData() .setThrottleTimeMs(throttleTimeMs) @@ -445,14 +438,6 @@ private static DeleteTopicsResponse prepareDeleteTopicsResponse(String topicName return new DeleteTopicsResponse(data); } - private static DeleteTopicsResponse prepareDeleteTopicsResponseWithTopicId(Uuid id, Errors error) { - DeleteTopicsResponseData data = new DeleteTopicsResponseData(); - data.responses().add(new DeletableTopicResult() - .setTopicId(id) - .setErrorCode(error.code())); - return new DeleteTopicsResponse(data); - } - private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) { return FindCoordinatorResponse.prepareResponse(error, node); } @@ -886,33 +871,8 @@ public void testDeleteTopics() throws Exception { future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class); - - // With topic IDs - Uuid topicId = Uuid.randomUuid(); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId), - prepareDeleteTopicsResponseWithTopicId(topicId, Errors.NONE)); - future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), - new DeleteTopicsOptions()).all(); - assertNull(future.get()); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId), - prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED)); - future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), - new DeleteTopicsOptions()).all(); - TestUtils.assertFutureError(future, TopicDeletionDisabledException.class); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId), - prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID)); - future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), - new DeleteTopicsOptions()).all(); - TestUtils.assertFutureError(future, UnknownTopicIdException.class); } } - @Test public void testDeleteTopicsPartialResponse() throws Exception { @@ -929,20 +889,6 @@ public void testDeleteTopicsPartialResponse() throws Exception { result.values().get("myTopic").get(); TestUtils.assertFutureThrows(result.values().get("myOtherTopic"), ApiException.class); - - // With topic IDs - Uuid topicId1 = Uuid.randomUuid(); - Uuid topicId2 = Uuid.randomUuid(); - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2), - prepareDeleteTopicsResponse(1000, - deletableTopicResultWithId(topicId1, Errors.NONE))); - - DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( - asList(topicId1, topicId2), new DeleteTopicsOptions()); - - resultIds.values().get(topicId1).get(); - TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ApiException.class); } } @@ -975,36 +921,6 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); - - // With topic IDs - Uuid topicId1 = Uuid.randomUuid(); - Uuid topicId2 = Uuid.randomUuid(); - Uuid topicId3 = Uuid.randomUuid(); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), - prepareDeleteTopicsResponse(1000, - deletableTopicResultWithId(topicId1, Errors.NONE), - deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), - deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId2), - prepareDeleteTopicsResponse(1000, - deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED))); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId2), - prepareDeleteTopicsResponse(0, - deletableTopicResultWithId(topicId2, Errors.NONE))); - - DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( - asList(topicId1, topicId2, topicId3), - new DeleteTopicsOptions().retryOnQuotaViolation(true)); - - assertNull(resultIds.values().get(topicId1).get()); - assertNull(resultIds.values().get(topicId2).get()); - TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -1050,43 +966,6 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO ThrottlingQuotaExceededException.class); assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); - - // With topic IDs - Uuid topicId1 = Uuid.randomUuid(); - Uuid topicId2 = Uuid.randomUuid(); - Uuid topicId3 = Uuid.randomUuid(); - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), - prepareDeleteTopicsResponse(1000, - deletableTopicResultWithId(topicId1, Errors.NONE), - deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), - deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); - - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId2), - prepareDeleteTopicsResponse(1000, - deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED))); - - DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( - asList(topicId1, topicId2, topicId3), - new DeleteTopicsOptions().retryOnQuotaViolation(true)); - - // Wait until the prepared attempts have consumed - TestUtils.waitForCondition(() -> env.kafkaClient().numAwaitingResponses() == 0, - "Failed awaiting DeleteTopics requests"); - - // Wait until the next request is sent out - TestUtils.waitForCondition(() -> env.kafkaClient().inFlightRequestCount() == 1, - "Failed awaiting next DeleteTopics request"); - - // Advance time past the default api timeout to time out the inflight request - time.sleep(defaultApiTimeout + 1); - - assertNull(resultIds.values().get(topicId1).get()); - e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), - ThrottlingQuotaExceededException.class); - assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -1111,27 +990,6 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex ThrottlingQuotaExceededException.class); assertEquals(1000, e.throttleTimeMs()); TestUtils.assertFutureError(result.values().get("topic3"), TopicExistsException.class); - - // With topic IDs - Uuid topicId1 = Uuid.randomUuid(); - Uuid topicId2 = Uuid.randomUuid(); - Uuid topicId3 = Uuid.randomUuid(); - env.kafkaClient().prepareResponse( - expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), - prepareDeleteTopicsResponse(1000, - deletableTopicResultWithId(topicId1, Errors.NONE), - deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), - deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); - - DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( - asList(topicId1, topicId2, topicId3), - new DeleteTopicsOptions().retryOnQuotaViolation(false)); - - assertNull(resultIds.values().get(topicId1).get()); - e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), - ThrottlingQuotaExceededException.class); - assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureError(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -1145,16 +1003,6 @@ private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopics(final Stri }; } - private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uuid... topicIds) { - return body -> { - if (body instanceof DeleteTopicsRequest) { - DeleteTopicsRequest request = (DeleteTopicsRequest) body; - return request.topicIds().equals(Arrays.asList(topicIds)); - } - return false; - }; - } - @Test public void testInvalidTopicNames() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index c2b9cffac3925..f4ccc98a73c0b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -422,37 +422,6 @@ synchronized public DeleteTopicsResult deleteTopics(Collection topicsToD return new DeleteTopicsResult(deleteTopicsResult); } - @Override - synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topicsToDelete, DeleteTopicsOptions options) { - Map> deleteTopicsWithIdsResult = new HashMap<>(); - - if (timeoutNextRequests > 0) { - for (final Uuid topicId : topicsToDelete) { - KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.completeExceptionally(new TimeoutException()); - deleteTopicsWithIdsResult.put(topicId, future); - } - - --timeoutNextRequests; - return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult); - } - - for (final Uuid topicId : topicsToDelete) { - KafkaFutureImpl future = new KafkaFutureImpl<>(); - - String name = topicNames.remove(topicId); - if (name == null || allTopics.remove(name) == null) { - future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId))); - } else { - topicIds.remove(name); - future.complete(null); - } - deleteTopicsWithIdsResult.put(topicId, future); - } - - return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult); - } - @Override synchronized public CreatePartitionsResult createPartitions(Map newPartitions, CreatePartitionsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 87cb25725d409..049ef67f90df3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.Random @@ -116,24 +115,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(e.getCause.isInstanceOf[TopicExistsException]) } - @Test - def testDeleteTopicsWithIds(): Unit = { - client = Admin.create(createConfig) - val topics = Seq("mytopic", "mytopic2", "mytopic3") - val newTopics = Seq( - new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), - new NewTopic("mytopic2", 3, 3.toShort), - new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) - ) - val createResult = client.createTopics(newTopics.asJava) - createResult.all.get() - waitForTopics(client, topics, List()) - val topicIds = getTopicIds().values.toSet - - client.deleteTopicsWithIds(topicIds.asJava).all.get() - waitForTopics(client, List(), topics) - } - @Test def testMetadataRefresh(): Unit = { client = Admin.create(createConfig)