Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Improve Log layer segment iteration logic and few other areas #10684

Merged
merged 6 commits into from
May 27, 2021

Conversation

kowshik
Copy link
Contributor

@kowshik kowshik commented May 13, 2021

  1. In Log.collectAbortedTransactions() I've restored a previously used logic, such that it would handle the case where the starting segment could be null. This was the case previously, but the PR KAFKA-12552: Introduce LogSegments class abstracting the segments map #10401 accidentally changed the behavior causing the code to assume that the starting segment won't be null.
  2. In Log.rebuildProducerState() I've removed usage of the allSegments local variable. The logic looks a bit simpler after I removed it.
  3. I've introduced a new LogSegments.higherSegments() API. This is now used to make the logic a bit more readable in Log. collectAbortedTransactions() and Log.deletableSegments() APIs.
  4. I've removed the unnecessary use of java.lang.Long in LogSegments class' segments map definition.
  5. I've converted a few LogSegments API from public to private, as they need not be public.

Tests:
Relying on existing unit tests.

@kowshik kowshik force-pushed the MINOR_improve_Log_layer_in_AK_a_bit branch 2 times, most recently from 010858d to c1b293f Compare May 13, 2021 01:07
* Returns an iterable to log segments ordered from lowest base offset to highest.
* Each segment in the returned iterable has a base offset strictly greater than the provided baseOffset.
*/
def higherSegments(baseOffset: Long): Iterable[LogSegment] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma If you think this API is a good fit to the requirement, I can add unit tests for it.

@kowshik
Copy link
Contributor Author

kowshik commented May 13, 2021

cc @junrao

core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved

done = fetchDataInfo != null || segmentEntryOpt.isEmpty
done = fetchDataInfo != null || segmentOpt.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply use while (fetchDataInfo == null && segmentOpt.isDefined) rather than while (!done)?

Copy link
Contributor Author

@kowshik kowshik May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Please see 80c40171302546be2dcf2444167d7f375f0b820d.

@kowshik kowshik requested a review from ijuma May 13, 2021 20:31
@ccding
Copy link
Contributor

ccding commented May 15, 2021

LGTM

Thanks @kowshik

@kowshik kowshik force-pushed the MINOR_improve_Log_layer_in_AK_a_bit branch from 0134a4d to 4ffeaa3 Compare May 24, 2021 17:50
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : Thanks for the PR. A couple of comments below.


done = fetchDataInfo != null || segmentEntryOpt.isEmpty
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
} else segmentOpt = segmentsIterator.nextOption()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic supports skipping forward multiple segments to find the right data. The new logic seems to only support skipping forward once. It would be useful to preserve the original semantic.

Copy link
Contributor Author

@kowshik kowshik May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an excellent catch. Sorry I missed this. Done, changed it now.

val view =
Option(segments.higherKey(baseOffset)).map {
higherOffset => segments.tailMap(higherOffset, true)
}.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we return a constant empty map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@kowshik kowshik force-pushed the MINOR_improve_Log_layer_in_AK_a_bit branch from e718bce to 4b83603 Compare May 25, 2021 07:17
@kowshik
Copy link
Contributor Author

kowshik commented May 26, 2021

Thanks for the review @junrao! I have addressed your comments in 4b836034415c3d5f6b84384ef9be1e75b66edc4b.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : Thanks for the updated PR. There seems to be a compilation error in java 8.

@kowshik kowshik force-pushed the MINOR_improve_Log_layer_in_AK_a_bit branch from 4b83603 to 4d86571 Compare May 27, 2021 02:08
@kowshik kowshik force-pushed the MINOR_improve_Log_layer_in_AK_a_bit branch from 4d86571 to 7a84aa1 Compare May 27, 2021 02:08
@kowshik
Copy link
Contributor Author

kowshik commented May 27, 2021

@junrao Thanks for the review and for catching this issue! I've fixed the build now. The issue was that the code in Log.scala used the Iterator.nextOption()API that was introduced only in scala v2.13, so the build failed for scala v2.12.

I have now also added unit tests for the newly introduced LogSegments.higherSegments() API.

The PR is ready for review again.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : Thanks for the updated PR. LGTM

@junrao junrao merged commit 3fb836f into apache:trunk May 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants