Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1 #3743

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
203b476
Initial commit of client side changes with some tests
Aug 12, 2017
546a9a7
Implemented broker side changes to cache extra metadata.
Aug 17, 2017
0a5e912
Change the client side code so that the sequence numbers are assigned
Aug 23, 2017
f4d39a8
WIP
Aug 24, 2017
a4ef1c4
Implemented log cleaning functionality with tests
Aug 25, 2017
7ae9366
Fix merge issues aftre rebasing onto trunk
Aug 25, 2017
f4ecc43
Remove some unneeded variables and function parameters
Aug 25, 2017
427cd71
More minor cleanups
Aug 25, 2017
833b39c
WIP
Aug 31, 2017
3bd9dec
Handled remaining corner cases:
Sep 6, 2017
c325153
Rebased onto trunk and fixed some merge conflicts
Sep 6, 2017
76ffdfb
Added unit tests for out of order responses and expiring batches
Sep 7, 2017
addbc8f
Address PR comments on client side code
Sep 7, 2017
c733e52
WIP
Sep 8, 2017
3c03f67
Revert changes where we stored the metadata for the last 5 batches. W…
Sep 8, 2017
7dc9eff
Addressed a few more PR comments
Sep 8, 2017
0ad49aa
Address review comments for the broker side chanegs. Added tests for …
Sep 9, 2017
9136c79
Handle duplicates during wraparound
Sep 11, 2017
5efcf2d
Two changes:
Sep 11, 2017
eb79059
Addressed more PR comments
Sep 11, 2017
d3fa704
Address a few more minor PR comments
Sep 11, 2017
f4bf62c
Revert changes to log4j.properties
Sep 11, 2017
c52b1f8
Delete unnecessary trailing whitespace
Sep 11, 2017
44236b8
Let the producerIdEntry have a concurrent structure for batch metadat…
Sep 13, 2017
5597178
Bunch of changes:
Sep 13, 2017
2ad47d4
Rename ProducerIdEntry.lastOffset to ProducerIdEntry.lastDataOffset e…
Sep 13, 2017
66bf9fa
Address more PR comments
Sep 13, 2017
8bc2df2
Fix previous commit which introduced a bug in RecordAccmulator.drain.
Sep 13, 2017
f394747
PR comment: return currentLastSequence instead of doing another map l…
Sep 13, 2017
04d9b3b
Remove the concurrent queue from ProducerIdEntry and instead return j…
Sep 13, 2017
9c8810c
Address other minor comments
Sep 13, 2017
a617308
Correctly transition to a fatal error state when batches in retry are…
Sep 13, 2017
78cea1c
Added assertions on the send futures for some of the newly added test…
Sep 13, 2017
9629c49
check the response offsets in all the tests
Sep 13, 2017
9bc3ef6
Fix checkstyle failures
Sep 13, 2017
27b58a4
Address further PR comments
Sep 14, 2017
3902de0
Address PR comments.
Sep 14, 2017
2d6b5de
Fix up build failures after merge
Sep 14, 2017
f7ad085
Address more PR comments:
Sep 14, 2017
b1f6530
Address final PR comment
Sep 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config, logContext, log);
int retries = configureRetries(config, transactionManager != null, log);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
short acks = configureAcks(config, transactionManager != null, log);

this.apiVersions = new ApiVersions();
Expand Down Expand Up @@ -481,18 +481,10 @@ private static int configureRetries(ProducerConfig config, boolean idempotenceEn
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}

private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
boolean userConfiguredInflights = false;
if (config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
userConfiguredInflights = true;
}
if (idempotenceEnabled && !userConfiguredInflights) {
log.info("Overriding the default {} to 1 since idempontence is enabled.", ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
return 1;
}
if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 in order" +
"to use the idempotent producer. Otherwise we cannot guarantee idempotence.");
private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
private long drainedMs;
private String expiryErrorMessage;
private boolean retry;
private boolean reopened = false;

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
this(tp, recordsBuilder, now, false);
Expand Down Expand Up @@ -249,6 +250,15 @@ public Deque<ProducerBatch> split(int splitBatchSize) {

produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new RecordBatchTooLargeException());
produceFuture.done();

if (hasSequence()) {
int sequence = baseSequence();
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
for (ProducerBatch newBatch : batches) {
newBatch.setProducerState(producerIdAndEpoch, sequence, isTransactional());
sequence += newBatch.recordCount;
}
}
return batches;
}

Expand Down Expand Up @@ -375,8 +385,12 @@ public boolean isFull() {
}

public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
baseSequence, isTransactional);
recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
}

public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
reopened = true;
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
}

/**
Expand All @@ -394,6 +408,7 @@ public void close() {
recordsBuilder.compressionType(),
(float) recordsBuilder.compressionRatio());
}
reopened = false;
}

/**
Expand Down Expand Up @@ -434,4 +449,21 @@ public long producerId() {
public short producerEpoch() {
return recordsBuilder.producerEpoch();
}

public int baseSequence() {
return recordsBuilder.baseSequence();
}

public boolean hasSequence() {
return baseSequence() != RecordBatch.NO_SEQUENCE;
}

public boolean isTransactional() {
return recordsBuilder.isTransactional();
}

public boolean sequenceHasBeenReset() {
return reopened;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,15 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);

}
return null;
}
Expand Down Expand Up @@ -309,7 +309,10 @@ public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
deque.addFirst(batch);
if (transactionManager != null)
insertInSequenceOrder(deque, batch);
else
deque.addFirst(batch);
}
}

Expand All @@ -331,12 +334,71 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
incomplete.add(batch);
// We treat the newly split batches as if they are not even tried.
synchronized (partitionDequeue) {
partitionDequeue.addFirst(batch);
if (transactionManager != null) {
// We should track the newly created batches since they already have assigned sequences.
transactionManager.addInFlightBatch(batch);
insertInSequenceOrder(partitionDequeue, batch);
} else {
partitionDequeue.addFirst(batch);
}
}
}
return numSplitBatches;
}

// The deque for the partition may have to be reordered in situations where leadership changes in between
// batch drains. Since the requests are on different connections, we no longer have any guarantees about ordering
// of the responses. Hence we will have to check if there is anything out of order and ensure the batch is queued
// in the correct sequence order.
//
// Note that this assumes that all the batches in the queue which have an assigned sequence also have the current
// producer id. We will not attempt to reorder messages if the producer id has changed.

private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
// When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence.
if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
throw new IllegalStateException("Trying to reenqueue a batch which doesn't have a sequence even " +
"though idempotence is enabled.");

if (transactionManager.nextBatchBySequence(batch.topicPartition) == null)
throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " +
"requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());

// If there are no inflight batches being tracked by the transaction manager, it means that the producer
// id must have changed and the batches being re enqueued are from the old producer id. In this case
// we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutOfOrderSequence -> OutOfOrderSequenceException

// or they will succeed.
if (batch.baseSequence() != transactionManager.nextBatchBySequence(batch.topicPartition).baseSequence()) {
// The incoming batch can't be inserted at the front of the queue without violating the sequence ordering.
// This means that the incoming batch should be placed somewhere further back.
// We need to find the right place for the incoming batch and insert it there.
// We will only enter this branch if we have multiple inflights sent to different brokers, perhaps
// because a leadership change occurred in between the drains. In this scenario, responses can come
// back out of order, requiring us to re order the batches ourselves rather than relying on the
// implicit ordering guarantees of the network client which are only on a per connection basis.

List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
// Either we have reached a point where there are batches without a sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place according to its sequence.
} else {
deque.addFirst(batch);
}
}

/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
Expand Down Expand Up @@ -469,20 +531,42 @@ public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
break;

isTransactional = transactionManager.isTransactional();

if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
// Don't drain any new batches while the state of previous sequence numbers
// is unknown. The previous batches would be unknown if they were aborted
// on the client after being sent to the broker at least once.
break;

if (first.hasSequence()
&& first.baseSequence() != transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
// If the queued batch already has an assigned sequence, then it is being
// retried. In this case, we wait until the next immediate batch is ready
// and drain that. We only move on when the next in line batch is complete (either successfully
// or due to a fatal broker error). This effectively reduces our
// in flight request count to 1.
break;
}

ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.inRetry()) {
// If the batch is in retry, then we should not change the producer id and
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular,
// the previous attempt may actually have been accepted, and if we change
// the producer id and sequence here, this attempt will also be accepted,
// causing a duplicate.
int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
log.debug("Assigning sequence number {} from producer {} to dequeued " +
"batch from partition {} bound for {}.",
sequenceNumber, producerIdAndEpoch, batch.topicPartition, node);
batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional);
//
// Additionally, we update the next sequence number bound for the partition,
// and also have the transaction manager track the batch so as to ensure
// that sequence ordering is maintained even if we receive out of order
// responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);

transactionManager.addInFlightBatch(batch);
}
batch.close();
size += batch.records().sizeInBytes();
Expand Down Expand Up @@ -634,7 +718,7 @@ void abortUndrainedBatches(RuntimeException reason) {
Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
boolean aborted = false;
synchronized (dq) {
if (!batch.isClosed()) {
if ((transactionManager != null && !batch.hasSequence()) || (transactionManager == null && !batch.isClosed())) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. seems we should always check !batch.isClosed()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to your other point about the state of the records builder. Right now, isClosed is just builtRecords != null. But we do reopen the batch (ie. nullify the builtRecords) when we have to change the sequence numbers due to a fatal failure. That is why I had to modify this check.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function is for aborting undrained batches, maybe an alternative is to have an isUndrained method on ProducerBatch which checks if drainedMs is greater than 0?

aborted = true;
batch.abortRecordAppends();
dq.remove(batch);
Expand Down
Loading