From 3708a7c6c1ecf1304f091dda1e79ae53ba2df489 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 2 Mar 2021 13:01:35 -0800 Subject: [PATCH] KAFKA-12369; Implement `ListTransactions` API (#10206) This patch implements the `ListTransactions` API as documented in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. Reviewers: Tom Bentley , Chia-Ping Tsai --- .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + .../requests/ListTransactionsRequest.java | 77 +++++++++++++++++++ .../requests/ListTransactionsResponse.java | 62 +++++++++++++++ .../message/DescribeTransactionsResponse.json | 2 +- .../message/ListTransactionsRequest.json | 31 ++++++++ .../message/ListTransactionsResponse.json | 35 +++++++++ .../common/requests/RequestResponseTest.java | 34 ++++++++ .../transaction/TransactionCoordinator.scala | 14 +++- .../transaction/TransactionLog.scala | 4 +- .../transaction/TransactionMetadata.scala | 64 +++++++++------ .../transaction/TransactionStateManager.scala | 53 +++++++++++++ .../kafka/network/RequestConvertToJson.scala | 2 + .../main/scala/kafka/server/KafkaApis.scala | 21 +++++ .../kafka/api/AuthorizerIntegrationTest.scala | 35 ++++++++- .../transaction/TransactionMetadataTest.scala | 39 +++++++++- .../TransactionStateManagerTest.scala | 70 ++++++++++++++++- .../unit/kafka/server/KafkaApisTest.scala | 67 ++++++++++++++++ .../unit/kafka/server/RequestQuotaTest.scala | 3 + 20 files changed, 587 insertions(+), 33 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java create mode 100644 clients/src/main/resources/common/message/ListTransactionsRequest.json create mode 100644 clients/src/main/resources/common/message/ListTransactionsResponse.json diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 07f1a62aec1f8..3297dc5734aa6 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -106,7 +106,8 @@ public enum ApiKeys { BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false), BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false), UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), - DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS); + DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), + LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 2b754e056ce0a..7802293c9bbfd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -286,6 +286,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return UnregisterBrokerRequest.parse(buffer, apiVersion); case DESCRIBE_TRANSACTIONS: return DescribeTransactionsRequest.parse(buffer, apiVersion); + case LIST_TRANSACTIONS: + return ListTransactionsRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index e35589f68ec4d..5f7b88f269405 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -243,6 +243,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return UnregisterBrokerResponse.parse(responseBuffer, version); case DESCRIBE_TRANSACTIONS: return DescribeTransactionsResponse.parse(responseBuffer, version); + case LIST_TRANSACTIONS: + return ListTransactionsResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java new file mode 100644 index 0000000000000..0651f1fe6e5cc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ListTransactionsRequestData; +import org.apache.kafka.common.message.ListTransactionsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ListTransactionsRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + public final ListTransactionsRequestData data; + + public Builder(ListTransactionsRequestData data) { + super(ApiKeys.LIST_TRANSACTIONS); + this.data = data; + } + + @Override + public ListTransactionsRequest build(short version) { + return new ListTransactionsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final ListTransactionsRequestData data; + + private ListTransactionsRequest(ListTransactionsRequestData data, short version) { + super(ApiKeys.LIST_TRANSACTIONS, version); + this.data = data; + } + + public ListTransactionsRequestData data() { + return data; + } + + @Override + public ListTransactionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + ListTransactionsResponseData response = new ListTransactionsResponseData() + .setErrorCode(error.code()) + .setThrottleTimeMs(throttleTimeMs); + return new ListTransactionsResponse(response); + } + + public static ListTransactionsRequest parse(ByteBuffer buffer, short version) { + return new ListTransactionsRequest(new ListTransactionsRequestData( + new ByteBufferAccessor(buffer), version), version); + } + + @Override + public String toString(boolean verbose) { + return data.toString(); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java new file mode 100644 index 0000000000000..13ed184fc3408 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ListTransactionsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class ListTransactionsResponse extends AbstractResponse { + private final ListTransactionsResponseData data; + + public ListTransactionsResponse(ListTransactionsResponseData data) { + super(ApiKeys.LIST_TRANSACTIONS); + this.data = data; + } + + public ListTransactionsResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + Map errorCounts = new HashMap<>(); + updateErrorCounts(errorCounts, Errors.forCode(data.errorCode())); + return errorCounts; + } + + public static ListTransactionsResponse parse(ByteBuffer buffer, short version) { + return new ListTransactionsResponse(new ListTransactionsResponseData( + new ByteBufferAccessor(buffer), version)); + } + + @Override + public String toString() { + return data.toString(); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + +} diff --git a/clients/src/main/resources/common/message/DescribeTransactionsResponse.json b/clients/src/main/resources/common/message/DescribeTransactionsResponse.json index affc5aa4f09a8..15f52a473d25e 100644 --- a/clients/src/main/resources/common/message/DescribeTransactionsResponse.json +++ b/clients/src/main/resources/common/message/DescribeTransactionsResponse.json @@ -20,7 +20,7 @@ "validVersions": "0", "flexibleVersions": "0+", "fields": [ - { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+" }, diff --git a/clients/src/main/resources/common/message/ListTransactionsRequest.json b/clients/src/main/resources/common/message/ListTransactionsRequest.json new file mode 100644 index 0000000000000..716b7530f8ccb --- /dev/null +++ b/clients/src/main/resources/common/message/ListTransactionsRequest.json @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 66, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ListTransactionsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "StateFilters", "type": "[]string", "versions": "0+", + "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned" + }, + { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", + "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned" + } + ] +} diff --git a/clients/src/main/resources/common/message/ListTransactionsResponse.json b/clients/src/main/resources/common/message/ListTransactionsResponse.json new file mode 100644 index 0000000000000..2f17873239143 --- /dev/null +++ b/clients/src/main/resources/common/message/ListTransactionsResponse.json @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 66, + "type": "response", + "name": "ListTransactionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+" }, + { "name": "UnknownStateFilters", "type": "[]string", "versions": "0+", + "about": "Set of state filters provided in the request which were unknown to the transaction coordinator" }, + { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ + { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" }, + { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" }, + { "name": "TransactionState", "type": "string", "versions": "0+", + "about": "The current transaction state of the producer" } + ]} + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 219c5c344a005..d873d15207374 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -137,6 +137,8 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.ListTransactionsRequestData; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetDeleteRequestData; @@ -560,6 +562,15 @@ public void testDescribeTransactionsSerialization() { } } + @Test + public void testListTransactionsSerialization() { + for (short v : ApiKeys.LIST_TRANSACTIONS.allVersions()) { + checkRequest(createListTransactionsRequest(v), true); + checkErrorResponse(createListTransactionsRequest(v), unknownServerException, true); + checkResponse(createListTransactionsResponse(), v, true); + } + } + @Test public void testDescribeClusterSerialization() { for (short v : ApiKeys.DESCRIBE_CLUSTER.allVersions()) { @@ -2806,4 +2817,27 @@ private DescribeTransactionsResponse createDescribeTransactionsResponse() { return new DescribeTransactionsResponse(data); } + private ListTransactionsRequest createListTransactionsRequest(short version) { + return new ListTransactionsRequest.Builder(new ListTransactionsRequestData() + .setStateFilters(singletonList("Ongoing")) + .setProducerIdFilters(asList(1L, 2L, 15L)) + ).build(version); + } + + private ListTransactionsResponse createListTransactionsResponse() { + ListTransactionsResponseData response = new ListTransactionsResponseData(); + response.setErrorCode(Errors.NONE.code()); + response.setTransactionStates(Arrays.asList( + new ListTransactionsResponseData.TransactionState() + .setTransactionalId("foo") + .setProducerId(12345L) + .setTransactionState("Ongoing"), + new ListTransactionsResponseData.TransactionState() + .setTransactionalId("bar") + .setProducerId(98765L) + .setTransactionState("PrepareAbort") + )); + return new ListTransactionsResponse(response); + } + } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 19f92435dcc42..8b8fde65a61e6 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -23,7 +23,7 @@ import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager} import kafka.utils.{Logging, Scheduler} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.message.DescribeTransactionsResponseData +import org.apache.kafka.common.message.{DescribeTransactionsResponseData, ListTransactionsResponseData} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch @@ -251,11 +251,21 @@ class TransactionCoordinator(brokerId: Int, s"This is illegal as we should never have transitioned to this state." fatal(errorMsg) throw new IllegalStateException(errorMsg) - } } } + def handleListTransactions( + filteredProducerIds: Set[Long], + filteredStates: Set[String] + ): ListTransactionsResponseData = { + if (!isActive.get()) { + new ListTransactionsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code) + } else { + txnManager.listTransactionStates(filteredProducerIds, filteredStates) + } + } + def handleDescribeTransactions( transactionalId: String ): DescribeTransactionsResponseData.TransactionState = { diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index 68a2bedfc957d..cb501f774fd9d 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -87,7 +87,7 @@ object TransactionLog { .setProducerId(txnMetadata.producerId) .setProducerEpoch(txnMetadata.producerEpoch) .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs) - .setTransactionStatus(txnMetadata.txnState.byte) + .setTransactionStatus(txnMetadata.txnState.id) .setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp) .setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp) .setTransactionPartitions(transactionPartitions)) @@ -128,7 +128,7 @@ object TransactionLog { producerEpoch = value.producerEpoch, lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs = value.transactionTimeoutMs, - state = TransactionMetadata.byteToState(value.transactionStatus), + state = TransactionState.fromId(value.transactionStatus), topicPartitions = mutable.Set.empty[TopicPartition], txnStartTimestamp = value.transactionStartTimestampMs, txnLastUpdateTimestamp = value.transactionLastUpdateTimestampMs) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index 5269f3e3349a2..b30094384d906 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -25,8 +25,40 @@ import org.apache.kafka.common.record.RecordBatch import scala.collection.{immutable, mutable} + +object TransactionState { + val AllStates = Set( + Empty, + Ongoing, + PrepareCommit, + PrepareAbort, + CompleteCommit, + CompleteAbort, + Dead, + PrepareEpochFence + ) + + def fromName(name: String): Option[TransactionState] = { + AllStates.find(_.name == name) + } + + def fromId(id: Byte): TransactionState = { + id match { + case 0 => Empty + case 1 => Ongoing + case 2 => PrepareCommit + case 3 => PrepareAbort + case 4 => CompleteCommit + case 5 => CompleteAbort + case 6 => Dead + case 7 => PrepareEpochFence + case _ => throw new IllegalStateException(s"Unknown transaction state id $id from the transaction status message") + } + } +} + private[transaction] sealed trait TransactionState { - def byte: Byte + def id: Byte /** * Get the name of this state. This is exposed through the `DescribeTransactions` API. @@ -41,7 +73,7 @@ private[transaction] sealed trait TransactionState { * received AddOffsetsToTxnRequest => Ongoing */ private[transaction] case object Empty extends TransactionState { - val byte: Byte = 0 + val id: Byte = 0 val name: String = "Empty" } @@ -54,7 +86,7 @@ private[transaction] case object Empty extends TransactionState { * received AddOffsetsToTxnRequest => Ongoing */ private[transaction] case object Ongoing extends TransactionState { - val byte: Byte = 1 + val id: Byte = 1 val name: String = "Ongoing" } @@ -64,7 +96,7 @@ private[transaction] case object Ongoing extends TransactionState { * transition: received acks from all partitions => CompleteCommit */ private[transaction] case object PrepareCommit extends TransactionState { - val byte: Byte = 2 + val id: Byte = 2 val name: String = "PrepareCommit" } @@ -74,7 +106,7 @@ private[transaction] case object PrepareCommit extends TransactionState { * transition: received acks from all partitions => CompleteAbort */ private[transaction] case object PrepareAbort extends TransactionState { - val byte: Byte = 3 + val id: Byte = 3 val name: String = "PrepareAbort" } @@ -84,7 +116,7 @@ private[transaction] case object PrepareAbort extends TransactionState { * Will soon be removed from the ongoing transaction cache */ private[transaction] case object CompleteCommit extends TransactionState { - val byte: Byte = 4 + val id: Byte = 4 val name: String = "CompleteCommit" } @@ -94,7 +126,7 @@ private[transaction] case object CompleteCommit extends TransactionState { * Will soon be removed from the ongoing transaction cache */ private[transaction] case object CompleteAbort extends TransactionState { - val byte: Byte = 5 + val id: Byte = 5 val name: String = "CompleteAbort" } @@ -102,7 +134,7 @@ private[transaction] case object CompleteAbort extends TransactionState { * TransactionalId has expired and is about to be removed from the transaction cache */ private[transaction] case object Dead extends TransactionState { - val byte: Byte = 6 + val id: Byte = 6 val name: String = "Dead" } @@ -111,7 +143,7 @@ private[transaction] case object Dead extends TransactionState { */ private[transaction] case object PrepareEpochFence extends TransactionState { - val byte: Byte = 7 + val id: Byte = 7 val name: String = "PrepareEpochFence" } @@ -130,20 +162,6 @@ private[transaction] object TransactionMetadata { new TransactionMetadata(transactionalId, producerId, lastProducerId, producerEpoch, lastProducerEpoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp) - def byteToState(byte: Byte): TransactionState = { - byte match { - case 0 => Empty - case 1 => Ongoing - case 2 => PrepareCommit - case 3 => PrepareAbort - case 4 => CompleteCommit - case 5 => CompleteAbort - case 6 => Dead - case 7 => PrepareEpochFence - case unknown => throw new IllegalStateException("Unknown transaction state byte " + unknown + " from the transaction status message") - } - } - def isValidTransition(oldState: TransactionState, newState: TransactionState): Boolean = TransactionMetadata.validPreviousStates(newState).contains(oldState) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index b547896706dce..61fad952dc44f 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -29,6 +29,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{Logging, Pool, Scheduler} import kafka.utils.Implicits._ import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.ListTransactionsResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.stats.{Avg, Max} import org.apache.kafka.common.protocol.Errors @@ -223,6 +224,58 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( + filterProducerIds: Set[Long], + filterStateNames: Set[String] + ): ListTransactionsResponseData = { + inReadLock(stateLock) { + val response = new ListTransactionsResponseData() + if (loadingPartitions.nonEmpty) { + response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code) + } else { + val filterStates = mutable.Set.empty[TransactionState] + filterStateNames.foreach { stateName => + TransactionState.fromName(stateName) match { + case Some(state) => filterStates += state + case None => response.unknownStateFilters.add(stateName) + } + } + + def shouldInclude(txnMetadata: TransactionMetadata): Boolean = { + if (txnMetadata.state == Dead) { + // We filter the `Dead` state since it is a transient state which + // indicates that the transactionalId and its metadata are in the + // process of expiration and removal. + false + } else if (filterProducerIds.nonEmpty && !filterProducerIds.contains(txnMetadata.producerId)) { + false + } else if (filterStateNames.nonEmpty && !filterStates.contains(txnMetadata.state)) { + false + } else { + true + } + } + + val states = new java.util.ArrayList[ListTransactionsResponseData.TransactionState] + transactionMetadataCache.forKeyValue { (_, cache) => + cache.metadataPerTransactionalId.values.foreach { txnMetadata => + txnMetadata.inLock { + if (shouldInclude(txnMetadata)) { + states.add(new ListTransactionsResponseData.TransactionState() + .setTransactionalId(txnMetadata.transactionalId) + .setProducerId(txnMetadata.producerId) + .setTransactionState(txnMetadata.state.name) + ) + } + } + } + } + response.setErrorCode(Errors.NONE.code) + .setTransactionStates(states) + } + } + } + /** * Get the transaction metadata associated with the given transactional id, or an error if * the coordinator does not own the transaction partition or is still loading it; if not found diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index aacd24ec28524..2f23dbc2b2f79 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -93,6 +93,7 @@ object RequestConvertToJson { case req: DescribeClusterRequest => DescribeClusterRequestDataJsonConverter.write(req.data, request.version) case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version) case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version) + case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so."); } @@ -166,6 +167,7 @@ object RequestConvertToJson { case res: DescribeClusterResponse => DescribeClusterResponseDataJsonConverter.write(res.data, version) case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version) case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version) + case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so."); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bc228403b1088..22054a3f57388 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -223,6 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) + case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3310,6 +3311,26 @@ class KafkaApis(val requestChannel: RequestChannel, new DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) } + def handleListTransactionsRequest(request: RequestChannel.Request): Unit = { + val listTransactionsRequest = request.body[ListTransactionsRequest] + val filteredProducerIds = listTransactionsRequest.data.producerIdFilters.asScala.map(Long.unbox).toSet + val filteredStates = listTransactionsRequest.data.stateFilters.asScala.toSet + val response = txnCoordinator.handleListTransactions(filteredProducerIds, filteredStates) + + // The response should contain only transactionalIds that the principal + // has `Describe` permission to access. + val transactionStateIter = response.transactionStates.iterator() + while (transactionStateIter.hasNext) { + val transactionState = transactionStateIter.next() + if (!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, transactionState.transactionalId)) { + transactionStateIter.remove() + } + } + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordConversionStats): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index ebe4634adf8a2..bb4266c98a09b 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -46,7 +46,7 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData} +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord} @@ -1805,6 +1805,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertThrows(classOf[TransactionalIdAuthorizationException], () => producer.commitTransaction()) } + @Test + def testListTransactionsAuthorization(): Unit = { + createTopic(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) + + // Start a transaction and write to a topic. + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get + + def assertListTransactionResult( + expectedTransactionalIds: Set[String] + ): Unit = { + val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() + val listTransactionsResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) + assertEquals(Errors.NONE, Errors.forCode(listTransactionsResponse.data.errorCode)) + assertEquals(expectedTransactionalIds, listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) + } + + // First verify that we can list the transaction + assertListTransactionResult(expectedTransactionalIds = Set(transactionalId)) + + // Now revoke authorization and verify that the transaction is no longer listable + removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) + assertListTransactionResult(expectedTransactionalIds = Set()) + + // The minimum permission needed is `Describe` + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), transactionalIdResource) + assertListTransactionResult(expectedTransactionalIds = Set(transactionalId)) + } + @Test def shouldNotIncludeUnauthorizedTopicsInDescribeTransactionsResponse(): Unit = { createTopic(topic) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala index 4e8b0fd6a387a..92407dddb5690 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala @@ -161,7 +161,7 @@ class TransactionMetadataTest { txnStartTimestamp = time.milliseconds(), txnLastUpdateTimestamp = time.milliseconds()) - // let new time be smaller; when transting from Empty the start time would be updated to the update-time + // let new time be smaller; when transiting from Empty the start time would be updated to the update-time var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1) txnMetadata.completeTransitionTo(transitMetadata) assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions) @@ -460,6 +460,43 @@ class TransactionMetadataTest { assertEquals(Left(Errors.PRODUCER_FENCED), result) } + @Test + def testTransactionStateIdAndNameMapping(): Unit = { + for (state <- TransactionState.AllStates) { + assertEquals(state, TransactionState.fromId(state.id)) + assertEquals(Some(state), TransactionState.fromName(state.name)) + } + } + + @Test + def testAllTransactionStatesAreMapped(): Unit = { + val unmatchedStates = mutable.Set( + Empty, + Ongoing, + PrepareCommit, + PrepareAbort, + CompleteCommit, + CompleteAbort, + PrepareEpochFence, + Dead + ) + + // The exhaustive match is intentional here to ensure that we are + // forced to update the test case if a new state is added. + TransactionState.AllStates.foreach { + case Empty => assertTrue(unmatchedStates.remove(Empty)) + case Ongoing => assertTrue(unmatchedStates.remove(Ongoing)) + case PrepareCommit => assertTrue(unmatchedStates.remove(PrepareCommit)) + case PrepareAbort => assertTrue(unmatchedStates.remove(PrepareAbort)) + case CompleteCommit => assertTrue(unmatchedStates.remove(CompleteCommit)) + case CompleteAbort => assertTrue(unmatchedStates.remove(CompleteAbort)) + case PrepareEpochFence => assertTrue(unmatchedStates.remove(PrepareEpochFence)) + case Dead => assertTrue(unmatchedStates.remove(Dead)) + } + + assertEquals(Set.empty, unmatchedStates) + } + private def testRotateProducerIdInOngoingState(state: TransactionState): Unit = { val producerEpoch = (Short.MaxValue - 1).toShort diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 20ca0c421d024..df576931525d6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.MockTime import org.easymock.{Capture, EasyMock, IAnswer} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue, fail} +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.jdk.CollectionConverters._ @@ -476,13 +476,79 @@ class TransactionStateManagerTest { } @Test - def shouldReturnNotCooridnatorErrorIfTransactionIdPartitionNotOwned(): Unit = { + def shouldReturnNotCoordinatorErrorIfTransactionIdPartitionNotOwned(): Unit = { transactionManager.getTransactionState(transactionalId1).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId1 + "'s transaction state is already in the cache") ) } + @Test + def testListTransactionsWithCoordinatorLoadingInProgress(): Unit = { + transactionManager.addLoadingPartition(partitionId = 0, coordinatorEpoch = 15) + val listResponse = transactionManager.listTransactionStates( + filterProducerIds = Set.empty, + filterStateNames = Set.empty + ) + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(listResponse.errorCode)) + } + + @Test + def testListTransactionsFiltering(): Unit = { + for (partitionId <- 0 until numPartitions) { + transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]()) + } + + def putTransaction( + transactionalId: String, + producerId: Long, + state: TransactionState + ): Unit = { + val txnMetadata = transactionMetadata(transactionalId, producerId, state) + transactionManager.putTransactionStateIfNotExists(txnMetadata).left.toOption.foreach { error => + fail(s"Failed to insert transaction $txnMetadata due to error $error") + } + } + + putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing) + putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing) + putTransaction(transactionalId = "t2", producerId = 2, state = PrepareCommit) + putTransaction(transactionalId = "t3", producerId = 3, state = PrepareAbort) + putTransaction(transactionalId = "t4", producerId = 4, state = CompleteCommit) + putTransaction(transactionalId = "t5", producerId = 5, state = CompleteAbort) + putTransaction(transactionalId = "t6", producerId = 6, state = CompleteAbort) + putTransaction(transactionalId = "t7", producerId = 7, state = PrepareEpochFence) + // Note that `Dead` transactions are never returned. This is a transient state + // which is used when the transaction state is in the process of being deleted + // (whether though expiration or coordinator unloading). + putTransaction(transactionalId = "t8", producerId = 8, state = Dead) + + def assertListTransactions( + expectedTransactionalIds: Set[String], + filterProducerIds: Set[Long] = Set.empty, + filterStates: Set[String] = Set.empty + ): Unit = { + val listResponse = transactionManager.listTransactionStates(filterProducerIds, filterStates) + assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode)) + assertEquals(expectedTransactionalIds, listResponse.transactionStates.asScala.map(_.transactionalId).toSet) + val expectedUnknownStates = filterStates.filter(state => TransactionState.fromName(state).isEmpty) + assertEquals(expectedUnknownStates, listResponse.unknownStateFilters.asScala.toSet) + } + + assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7")) + assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing")) + assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing", "UnknownState")) + assertListTransactions(Set("t2", "t4"), filterStates = Set("PrepareCommit", "CompleteCommit")) + assertListTransactions(Set(), filterStates = Set("UnknownState")) + assertListTransactions(Set("t5"), filterProducerIds = Set(5L)) + assertListTransactions(Set("t5", "t6"), filterProducerIds = Set(5L, 6L, 8L, 9L)) + assertListTransactions(Set("t4"), filterProducerIds = Set(4L, 5L), filterStates = Set("CompleteCommit")) + assertListTransactions(Set("t4", "t5"), filterProducerIds = Set(4L, 5L), filterStates = Set("CompleteCommit", "CompleteAbort")) + assertListTransactions(Set(), filterProducerIds = Set(3L, 6L), filterStates = Set("UnknownState")) + assertListTransactions(Set(), filterProducerIds = Set(10L), filterStates = Set("CompleteCommit")) + assertListTransactions(Set(), filterStates = Set("Dead")) + } + @Test def shouldOnlyConsiderTransactionsInTheOngoingStateToAbort(): Unit = { for (partitionId <- 0 until numPartitions) { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 476fe14cf9a65..e995c69302ce8 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -3481,6 +3481,73 @@ class KafkaApisTest { assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList) } + @Test + def testListTransactionsErrorResponse(): Unit = { + val data = new ListTransactionsRequestData() + val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build() + val request = buildRequest(listTransactionsRequest) + val capturedResponse = expectNoThrottling(request) + + EasyMock.expect(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String])) + .andReturn(new ListTransactionsResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code)) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) + createKafkaApis().handleListTransactionsRequest(request) + + val response = capturedResponse.getValue.asInstanceOf[ListTransactionsResponse] + assertEquals(0, response.data.transactionStates.size) + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(response.data.errorCode)) + } + + @Test + def testListTransactionsAuthorization(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + val data = new ListTransactionsRequestData() + val listTransactionsRequest = new ListTransactionsRequest.Builder(data).build() + val request = buildRequest(listTransactionsRequest) + val capturedResponse = expectNoThrottling(request) + + val transactionStates = new util.ArrayList[ListTransactionsResponseData.TransactionState]() + transactionStates.add(new ListTransactionsResponseData.TransactionState() + .setTransactionalId("foo") + .setProducerId(12345L) + .setTransactionState("Ongoing")) + transactionStates.add(new ListTransactionsResponseData.TransactionState() + .setTransactionalId("bar") + .setProducerId(98765) + .setTransactionState("PrepareAbort")) + + EasyMock.expect(txnCoordinator.handleListTransactions(Set.empty[Long], Set.empty[String])) + .andReturn(new ListTransactionsResponseData() + .setErrorCode(Errors.NONE.code) + .setTransactionStates(transactionStates)) + + def buildExpectedActions(transactionalId: String): util.List[Action] = { + val pattern = new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL) + val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true) + Collections.singletonList(action) + } + + EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("foo")))) + .andReturn(Seq(AuthorizationResult.ALLOWED).asJava) + .once() + + EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("bar")))) + .andReturn(Seq(AuthorizationResult.DENIED).asJava) + .once() + + EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer) + createKafkaApis(authorizer = Some(authorizer)).handleListTransactionsRequest(request) + + val response = capturedResponse.getValue.asInstanceOf[ListTransactionsResponse] + assertEquals(1, response.data.transactionStates.size()) + val transactionState = response.data.transactionStates.get(0) + assertEquals("foo", transactionState.transactionalId) + assertEquals(12345L, transactionState.producerId) + assertEquals("Ongoing", transactionState.transactionState) + } + @Test def testDeleteTopicsByIdAuthorization(): Unit = { val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index a5d5078a761d8..b992aac7f0631 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -639,6 +639,9 @@ class RequestQuotaTest extends BaseRequestTest { new DescribeTransactionsRequest.Builder(new DescribeTransactionsRequestData() .setTransactionalIds(List("test-transactional-id").asJava)) + case ApiKeys.LIST_TRANSACTIONS => + new ListTransactionsRequest.Builder(new ListTransactionsRequestData()) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) }