Skip to content

Commit

Permalink
[transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (streamnativ…
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Mar 10, 2023
1 parent 5ef4a85 commit 1f2fe99
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_TRANSACTIONS:
handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -577,6 +580,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.FetchRequestData;
Expand Down Expand Up @@ -161,6 +162,8 @@
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchRequest;
Expand Down Expand Up @@ -2043,6 +2046,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti
resultFuture.complete(new ListTransactionsResponse(listResult));
}

@Override
protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> response) {
checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest);
DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest();
DescribeTransactionsResponseData describeResult = getTransactionCoordinator()
.handleDescribeTransactions(request.data().transactionalIds());
response.complete(new DescribeTransactionsResponse(describeResult));
}

@Override
protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups,
CompletableFuture<AbstractResponse> resultFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -233,6 +235,76 @@ public ListTransactionsResponseData handleListTransactions(List<String> filtered
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
}

public DescribeTransactionsResponseData handleDescribeTransactions(List<String> transactionalIds) {
DescribeTransactionsResponseData response = new DescribeTransactionsResponseData();
if (transactionalIds != null) {
transactionalIds.forEach(transactionalId -> {
DescribeTransactionsResponseData.TransactionState transactionState =
handleDescribeTransactions(transactionalId);
response.transactionStates().add(transactionState);
});
}
return response;
}

private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270
if (transactionalId == null) {
throw new IllegalArgumentException("Invalid null transactionalId");
}

DescribeTransactionsResponseData.TransactionState transactionState =
new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId);

if (!isActive.get()) {
transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
} else if (transactionalId.isEmpty()) {
transactionState.setErrorCode(Errors.INVALID_REQUEST.code());
} else {
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> tState =
txnManager.getTransactionState(transactionalId);
if (tState.isLeft()) {
transactionState.setErrorCode(tState.getLeft().code());
} else {
Optional<CoordinatorEpochAndTxnMetadata> right = tState.getRight();
if (!right.isPresent()) {
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get();
TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata();
txnMetadata.inLock(() -> {
if (txnMetadata.getState() == DEAD) {
// The transaction state is being expired, so ignore it
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
txnMetadata.getTopicPartitions().forEach(topicPartition -> {
var topicData = transactionState.topics().find(topicPartition.topic());
if (topicData == null) {
topicData = new DescribeTransactionsResponseData.TopicData()
.setTopic(topicPartition.topic());
transactionState.topics().add(topicData);
}
topicData.partitions().add(topicPartition.partition());
});

transactionState
.setErrorCode(Errors.NONE.code())
.setProducerId(txnMetadata.getProducerId())
.setProducerEpoch(txnMetadata.getProducerEpoch())
.setTransactionState(txnMetadata.getState().toAdminState().toString())
.setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs())
.setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp());
}
return null;
});
}
}
}
return transactionState;
}

@Data
@EqualsAndHashCode
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeTransactionsRequestData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
Expand Down Expand Up @@ -128,6 +130,8 @@
import org.apache.kafka.common.requests.DescribeClusterResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
Expand Down Expand Up @@ -1124,9 +1128,10 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest kafkaHeaderAn
this.<ListTransactionsRequest, ListTransactionsRequestData, ListTransactionsResponse>
sendRequestToAllCoordinators(kafkaHeaderAndRequest, response,
FindCoordinatorRequest.CoordinatorType.TRANSACTION,
null, // all the keys
ListTransactionsRequest.class,
ListTransactionsRequestData.class,
(listTransactionsRequest) -> {
(ListTransactionsRequest listTransactionsRequest, List<String> keys) -> {
ListTransactionsRequestData data = listTransactionsRequest.data();
return new ListTransactionsRequest.Builder(data).build(listTransactionsRequest.version());
},
Expand Down Expand Up @@ -1155,15 +1160,42 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest kafkaHeaderAn
null);
}

@Override
protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
this.<DescribeTransactionsRequest, DescribeTransactionsRequestData, DescribeTransactionsResponse>
sendRequestToAllCoordinators(kafkaHeaderAndRequest, response,
FindCoordinatorRequest.CoordinatorType.TRANSACTION,
(DescribeTransactionsRequestData data) -> data.transactionalIds(),
DescribeTransactionsRequest.class,
DescribeTransactionsRequestData.class,
(DescribeTransactionsRequest describeTransactionsRequest, List<String> keys) -> {
DescribeTransactionsRequestData data = new DescribeTransactionsRequestData()
.setTransactionalIds(keys);
return new DescribeTransactionsRequest
.Builder(data)
.build(describeTransactionsRequest.version());
},
(allResponses) -> {
DescribeTransactionsResponseData responseData = new DescribeTransactionsResponseData();
for (DescribeTransactionsResponse r : allResponses) {
responseData.transactionStates().addAll(r.data().transactionStates());
}
return new DescribeTransactionsResponse(responseData);
},
null);
}

@Override
protected void handleListGroupsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response) {
this.<ListGroupsRequest, ListGroupsRequestData, ListGroupsResponse>
sendRequestToAllCoordinators(kafkaHeaderAndRequest, response,
FindCoordinatorRequest.CoordinatorType.GROUP,
null,
ListGroupsRequest.class,
ListGroupsRequestData.class,
(listGroupsRequest) -> {
(listGroupsRequest, ___) -> {
ListGroupsRequestData data = listGroupsRequest.data();
return new ListGroupsRequest.Builder(data).build(listGroupsRequest.version());
},
Expand Down Expand Up @@ -2119,9 +2151,10 @@ R extends AbstractResponse> void sendRequestToAllCoordinators(
KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> resultFuture,
FindCoordinatorRequest.CoordinatorType coordinatorType,
Function<V, List<String>> keysExtractor,
Class<K> requestClass,
Class<V> requestDataClass,
Function<K, K> cloneRequest,
BiFunction<K, List<String>, K> cloneRequest,
Function<List<R>, R> responseCollector,
BiFunction<K, Throwable, R> customErrorBuilder
) {
Expand Down Expand Up @@ -2150,29 +2183,48 @@ R extends AbstractResponse> void sendRequestToAllCoordinators(
checkArgument(requestClass.isInstance(kafkaHeaderAndRequest.getRequest()));
K request = (K) kafkaHeaderAndRequest.getRequest();
checkArgument(requestDataClass.isInstance(request.data()));
int numPartitions;
switch (coordinatorType) {
case GROUP:
numPartitions = kafkaConfig.getOffsetsTopicNumPartitions();
break;
case TRANSACTION:
numPartitions = kafkaConfig.getKafkaTxnLogTopicNumPartitions();
break;
default:
throw new IllegalArgumentException("Unknown coordinator type " + coordinatorType);
}

if (!isNoisyRequest(request)) {
log.info("sendRequestToAllCoordinators {} {} {} np={}", request.getClass().getSimpleName(),
request, numPartitions);
}

Map<Node, List<String>> keysByBroker = new ConcurrentHashMap<>();
List<CompletableFuture<Node>> findBrokers = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
String pulsarTopicName = computePulsarTopicNameForPartition(coordinatorType, i);
if (keysExtractor == null) {

int numPartitions;
switch (coordinatorType) {
case GROUP:
numPartitions = kafkaConfig.getOffsetsTopicNumPartitions();
break;
case TRANSACTION:
numPartitions = kafkaConfig.getKafkaTxnLogTopicNumPartitions();
break;
default:
throw new IllegalArgumentException("Unknown coordinator type " + coordinatorType);
}
if (!isNoisyRequest(request)) {
log.info("sendRequestToAllCoordinators {} {} {} np={}", request.getClass().getSimpleName(),
request, numPartitions);
}
for (int i = 0; i < numPartitions; i++) {
String pulsarTopicName = computePulsarTopicNameForPartition(coordinatorType, i);

findBrokers.add(findBroker(TopicName.get(pulsarTopicName), true)
.thenApply(m -> m.node));
findBrokers.add(findBroker(TopicName.get(pulsarTopicName), true)
.thenApply(m -> m.node));
}
} else {
List<String> keys = keysExtractor.apply((V) request.data());
for (String key : keys) {
String pulsarTopicName = computePulsarTopicName(coordinatorType, key);
findBrokers.add(findBroker(TopicName.get(pulsarTopicName), true)
.thenApply(m -> {
keysByBroker.compute(m.node, (k, currentList) -> {
if (currentList == null) {
currentList = new CopyOnWriteArrayList<>();
}
currentList.add(key);
return currentList;
});
return m.node;
}));
}
}

CompletableFuture<Set<Node>> cc = FutureUtil.waitForAll(findBrokers)
Expand All @@ -2188,10 +2240,11 @@ R extends AbstractResponse> void sendRequestToAllCoordinators(
CompletableFuture<R> finalResult = cc.thenCompose(coordinators -> {
List<CompletableFuture<R>> futures = new CopyOnWriteArrayList<>();
for (Node node : coordinators) {
List<String> keysForBroker = keysByBroker.get(node);
CompletableFuture<R> responseFromBroker = new CompletableFuture<>();
futures.add(responseFromBroker);
KafkaHeaderAndRequest requestWithNewHeader =
executeCloneRequest(kafkaHeaderAndRequest, cloneRequest);
executeCloneRequest(kafkaHeaderAndRequest, cloneRequest, keysForBroker);
grabConnectionToBroker(node.host(), node.port())
.forwardRequest(requestWithNewHeader)
.thenAccept(serverResponse -> {
Expand Down Expand Up @@ -2246,15 +2299,16 @@ R extends AbstractResponse> void sendRequestToAllCoordinators(
}

private <K extends AbstractRequest> KafkaHeaderAndRequest executeCloneRequest(
KafkaHeaderAndRequest kafkaHeaderAndRequest, Function<K, K> cloneRequest) {
KafkaHeaderAndRequest kafkaHeaderAndRequest, BiFunction<K, List<String>, K> cloneRequest,
List<String> keys) {
int dummyCorrelationId = getDummyCorrelationId();
RequestHeader header = new RequestHeader(
kafkaHeaderAndRequest.getHeader().apiKey(),
kafkaHeaderAndRequest.getHeader().apiVersion(),
kafkaHeaderAndRequest.getHeader().clientId(),
dummyCorrelationId
);
K requestForSingleBroker = cloneRequest.apply((K) kafkaHeaderAndRequest.getRequest());
K requestForSingleBroker = cloneRequest.apply((K) kafkaHeaderAndRequest.getRequest(), keys);
ByteBuf buffer = KopResponseUtils.serializeRequest(header, requestForSingleBroker);
KafkaHeaderAndRequest requestWithNewHeader = new KafkaHeaderAndRequest(
header,
Expand Down
Loading

0 comments on commit 1f2fe99

Please sign in to comment.