Skip to content

Commit

Permalink
Force atleast one poll to Kafka when polling for available records fr…
Browse files Browse the repository at this point in the history
…om local record queue
  • Loading branch information
Naireen committed Sep 26, 2024
1 parent faecdb2 commit 45f6c4a
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
*/
class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {

boolean atleastOnePollCompleted = false;

///////////////////// Reader API ////////////////////////////////////////////////////////////
@SuppressWarnings("FutureReturnValueIgnored")
@Override
Expand Down Expand Up @@ -158,7 +160,6 @@ public boolean advance() throws IOException {
*/
while (true) {
if (curBatch.hasNext()) {
// data from the next partition?
PartitionState<K, V> pState = curBatch.next();

if (!pState.recordIter.hasNext()) { // -- (c)
Expand Down Expand Up @@ -229,16 +230,14 @@ public boolean advance() throws IOException {
for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
backlogBytesOfSplit.set(backlogSplit.getValue());
}
return true; // record has been read and proccessed, so we return. (only read a record at a time)
return true;

} else { // -- (b)
nextBatch(); // void, returns nothing, can this be done in the background instead of when we call advance?

if (!curBatch.hasNext()) { // returns false because nothing returned in time?
nextBatch();
atleastOnePollCompleted = false; // Reset it for next call
if (!curBatch.hasNext()) {
return false;
}
// Gives no such element exception
// return true; //? returns, then will call advance again. Whats the difference between repeatedly calling advance vs iterating over constatnly?
}
}
}
Expand Down Expand Up @@ -333,7 +332,7 @@ public long getSplitBacklogBytes() {

private final KafkaUnboundedSource<K, V> source;
private final String name;
private @Nullable Consumer<byte[], byte[]> consumer = null;
@VisibleForTesting @Nullable Consumer<byte[], byte[]> consumer = null;
private final List<PartitionState<K, V>> partitionStates;
private @Nullable KafkaRecord<K, V> curRecord = null;
private @Nullable Instant curTimestamp = null;
Expand Down Expand Up @@ -572,22 +571,26 @@ private void consumerPollLoop() {
try {
if (records.isEmpty()) {
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
atleastOnePollCompleted = true;
} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
records = ConsumerRecords.empty();
}

commitCheckpointMark();
} catch (InterruptedException e) {
atleastOnePollCompleted = true;
LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
break;
} catch (WakeupException e) {
atleastOnePollCompleted = true;
break;
}
}
LOG.info("{}: Returning from consumer pool loop", this);
} catch (Exception e) { // mostly an unrecoverable KafkaException.
LOG.error("{}: Exception while reading from Kafka", this, e);
atleastOnePollCompleted = true;
consumerPollException.set(e);
throw e;
}
Expand Down Expand Up @@ -624,19 +627,24 @@ void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) {
checkpointMarkCommitsEnqueued.inc();
}

// Ensure atleast one consumer poll has completed since this was last called.
private void nextBatch() throws IOException {
curBatch = Collections.emptyIterator();

ConsumerRecords<byte[], byte[]> records;
try {
// poll available records, wait (if necessary) up to the specified timeout.
records =
availableRecordsQueue.poll(recordsDequeuePollTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("{}: Unexpected", this, e);
return;
}
do {
// try until background poll has completed atleast once
try {
// poll available records, wait (if necessary) up to the specified timeout.
records =
availableRecordsQueue.poll(
recordsDequeuePollTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("{}: Unexpected", this, e);
return;
}

} while (!atleastOnePollCompleted);

if (records == null) {
// Check if the poll thread failed with an exception.
Expand All @@ -656,7 +664,8 @@ private void nextBatch() throws IOException {
LOG.debug("Record count: " + records.count());
}

partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator());
ConsumerRecords<byte[], byte[]> finalRecords = records;
partitionStates.forEach(p -> p.recordIter = finalRecords.records(p.topicPartition).iterator());

// cycle through the partitions in order to interleave records from each.
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
Expand Down
Loading

0 comments on commit 45f6c4a

Please sign in to comment.