Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid StackOverflowException when Kafka.poll() 'timeout' param is 0 #2

Merged
merged 1 commit into from
Feb 2, 2016

Conversation

mariobriggs
Copy link

implement fetchBatch() as a loop when kafka.poll is not large enough interval for data to reach kafkaClient from kafkaServer

implement fetchBatch() as a loop when kafka.poll is not large enough interval for data to reach kafkaClient from kafkaServer
var recs: ConsumerRecords[K, V] = null
do {
recs = consumer.poll(pollTime)
} while (recs.isEmpty && requestOffset < part.untilOffset)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @mariobriggs
Should we take out the recursive call below then?

I think the whole if clause can go then?

      if (!iter.hasNext) {
        if ( requestOffset < part.untilOffset ) {
          return getNext()
        }
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {

Because iter.hasNext will always be true when fetchBatch returns since it won't return until it has something non-empty to return.

BTW, we should definitely test to make sure this works when the topic is empty and doesn't stall. I will take care of that. Let me know what you think of the above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the removal of recursive call is right.

make sure this works when the topic is empty
<<
the '&& requestOffset < part.untilOffset' catches that i think

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

markgrover added a commit that referenced this pull request Feb 2, 2016
Avoid StackOverflowException when Kafka.poll() 'timeout' param is 0
@markgrover markgrover merged commit 8fae471 into markgrover:kafka09-integration Feb 2, 2016
@markgrover
Copy link
Owner

I am merging this, thanks again for this @mariobriggs
I can take care of the follow-up but do let me know if you disagree.

markgrover pushed a commit that referenced this pull request Apr 7, 2016
## What changes were proposed in this pull request?

This reopens apache#11836, which was merged but promptly reverted because it introduced flaky Hive tests.

## How was this patch tested?

See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes apache#11938 from andrewor14/session-catalog-again.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants