Skip to content

Commit

Permalink
Fix pulsar client transaction test
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 authored and huangdx0726 committed Nov 13, 2020
1 parent 8f9861a commit a662470
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
.subscriptionName("test")
.enableBatchIndexAcknowledgment(true)
.subscriptionType(subscriptionType)
.acknowledgmentGroupTime(0, TimeUnit.MICROSECONDS)
.subscribe();

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,8 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
msgMetadata.recycle();
return;
}
msgId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex(), messageId.getBatchIndex(), -1, BatchMessageAckerDisabled.INSTANCE);
BatchMessageAcker batchMessageAcker = BatchMessageAcker.newAcker(ackBitSet);
msgId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex(), messageId.getBatchIndex(), -1, batchMessageAcker);
}

final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId, msgMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String,
public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
Map<String, Long> properties, TransactionImpl txn) {
if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties,
txn == null ? -1 : txn.getTxnIdMostBits(),
txn == null ? -1 : txn.getTxnIdLeastBits());
} else if (ackType == AckType.Cumulative) {
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
Expand Down

0 comments on commit a662470

Please sign in to comment.