diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index fed03047cf16e..804772efb2fdb 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -158,6 +158,7 @@ public boolean advance() throws IOException { */ while (true) { if (curBatch.hasNext()) { + // data from the next partition? PartitionState pState = curBatch.next(); if (!pState.recordIter.hasNext()) { // -- (c) @@ -228,14 +229,16 @@ public boolean advance() throws IOException { for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { backlogBytesOfSplit.set(backlogSplit.getValue()); } - return true; + return true; // record has been read and proccessed, so we return. (only read a record at a time) } else { // -- (b) - nextBatch(); + nextBatch(); // void, returns nothing, can this be done in the background instead of when we call advance? - if (!curBatch.hasNext()) { + if (!curBatch.hasNext()) { // returns false because nothing returned in time? return false; } + // Gives no such element exception + // return true; //? returns, then will call advance again. Whats the difference between repeatedly calling advance vs iterating over constatnly? } } }