-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ReadCommitted #1307
Implement ReadCommitted #1307
Conversation
oh btw, fix #1284 In the issue you will find my setup to make some manual test. |
This would be very useful. Is there any plan to add it to master branch? |
08e3038
to
f5e2b7f
Compare
At this point, the bulk of the effort is done. I would change this manual test using docker to a functional test using vagrant to match what is already existing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really small comments from my side.
Thank you very much for this PR, I was waiting for this for a long time. Glad someone took it 👍
fetch_response.go
Outdated
@@ -385,6 +385,65 @@ func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, ke | |||
batch.addRecord(rec) | |||
} | |||
|
|||
// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp | |||
// But instead of append a record a batch of record it append a new batch of record of size 1 to a set of batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// But instead of append a record a batch of record it append a new batch of record of size 1 to a set of batch | |
// But instead of appending .... |
Here I'm not sure to fully understand your comment. Could we rephrase it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is it now?
} | ||
// add aborted transaction to abortedProducer list and depile abortedTransactions | ||
abortedProducerIDs[abortedTransaction.ProducerID] = none{} | ||
abortedTransactions = abortedTransactions[1:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://play.golang.org/p/7C8k2ijtvWX
What are you expecting to happen after line 626, it seems that loop still iterate over original slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is expected that loop l620 iterate over original slice.
Poping the slice is used for the loop l610
Basically how it works:
- Considering each producers can only have one unresolved transaction at any moment.
- Considering for any batch, the WHOLE batch of message is part of the same transaction
- Consumer iterate over each batches (l610)
- For each batch, add to the list of aborted producer every transaction that started in a previous offset (l620)
- Whenever we encounter an abortedTransaction control record: resolve this transaction. Such transaction should never be considered again
The whole point of unstacking abortedTransactions
is just to make sure we add them only once to the list of current (as in current offset) aborted transactions and never again.
Even more so after transaction is resolved and the same producer may have actually produced a committed batch displayed in the same fetchResponse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment in code have been updated. I hope it is clear enough now
- rm manual test, add functional test - add java producer to vagrant setup - wget kafka from archive
There, all green |
ca57c79
to
d2e4a4d
Compare
I am not pushing anymore commit until review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I don't have setup to test it thoroughly so we might have to go to test in production
way :)
Here is my contribution to implement the read_committed behavior for the consumers.
This is not full transaction support. You would need to add the ability to produce transaction for that.
This is only the ability to read committed messages only.
Also this is a draft. Unit tests are coming up and I will check a few cases.
But I would like a feedback before spending too much time in the wrong direction.
I mostly copied the java code as best as I could.
For reference I am mostly talking about this part: https://github.com/apache/kafka/blob/38a7708a47361929df37ed0f03160bbac5e51e93/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1171