Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent kafka binding #988

Merged
merged 1 commit into from
Mar 29, 2024
Merged

Conversation

yanmxa
Copy link
Contributor

@yanmxa yanmxa commented Dec 7, 2023

Signed-off-by: myan myan@redhat.com

Adding the Kafka binding with https://github.com/confluentinc/confluent-kafka-go. This implementation will support the following features compared to the sarama binding.

Note: In the confluent samples, with configuration WithPollGoroutines(1) to ensure the order of the messages on the partition.

Since Kafka already natively utilizes consumer groups to enhance consumption efficiency, there is no need to use multiple goroutines to boost consumption speed.

Resolve: #918

@yanmxa yanmxa force-pushed the br_kafka_confluent branch 4 times, most recently from dd224b3 to 5937f4b Compare December 11, 2023 17:28
@yanmxa
Copy link
Contributor Author

yanmxa commented Dec 12, 2023

v2/context/context.go Outdated Show resolved Hide resolved
@yanmxa
Copy link
Contributor Author

yanmxa commented Jan 8, 2024

@embano1 Thanks for your review!

@embano1
Copy link
Member

embano1 commented Jan 23, 2024

Thy @yanmxa and sorry for my slow responses. I'm under the water at the moment at work and can't give you a concrete date when I'll be able to review. Perhaps @duglin and @lionelvillard can take another look?

@lionelvillard
Copy link
Contributor

@yanmxa can you update the documentation: https://github.com/cloudevents/sdk-go/blob/main/docs/protocol_implementations.md?

Also I wonder if it makes sense to indicate (maybe in the documentation) this is a new implementation and it hasn't been widely tested in the wild. WDYT?

@yanmxa
Copy link
Contributor Author

yanmxa commented Jan 26, 2024

@yanmxa can you update the documentation: https://github.com/cloudevents/sdk-go/blob/main/docs/protocol_implementations.md?

Also I wonder if it makes sense to indicate (maybe in the documentation) this is a new implementation and it hasn't been widely tested in the wild. WDYT?

Thanks @lionelvillard! I update the protocol binding of the related document, #1008. PTAL~

@yanmxa
Copy link
Contributor Author

yanmxa commented Jan 30, 2024

Thanks @duglin !

@yanmxa
Copy link
Contributor Author

yanmxa commented Mar 6, 2024

@duglin @embano1 @lionelvillard PTAL~

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked the whole PR because for some parts especially structured/binary handling, I'd like to see more tests for non-happy path scenarios and mis-use if possible. Could you please also add Confluent to the Github integration tests so we run this against Confluent images?

protocol/kafka_confluent/v2/go.mod Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/message.go Outdated Show resolved Hide resolved
contentType = string(header.Value)
}
if k == specs.PrefixedSpecVersionName() {
contentVersion = string(header.Value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what if header.Value is 1.0.2 but MUST be 1.0 according to

Compliant event producers MUST use a value of 1.0 when referring to this version of the specification
https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#specversion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I‘m not sure how to handle the specversion with value 1.0.2. Maybe just validate the 'major' and 'minor' versions whether compliant with "1.0"? and if not, then print a warning message?

I checked the other protocol. Then all just assert whether the specversion key exists and ignore the value.

Do you have any other advice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, IMHO this is a shortcoming in the other implementations. I'm fine leaving this here as is then given it's consistent with our other implementations. cc/ @duglin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like being forgiving and just ignoring the patch version number - if we want to do anything different at all. I'm ok with the current choice of only looking for 0.3 and 1.0.

protocol/kafka_confluent/v2/message.go Show resolved Hide resolved
protocol/kafka_confluent/v2/option.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/option.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/option.go Show resolved Hide resolved
@yanmxa
Copy link
Contributor Author

yanmxa commented Mar 11, 2024

I haven't checked the whole PR because for some parts especially structured/binary handling, I'd like to see more tests for non-happy path scenarios and mis-use if possible. Could you please also add Confluent to the Github integration tests so we run this against Confluent images?

Thank @embano1!

I added some tests for the misuse cases when trying to initialize the client. If others come up with scenarios beyond these, we can continually improve it in the future.

Also, I updated the GitHub integration test workflow to add the kafka_confluent service(running on port 9192, the sarama running on 9092) with the confluent image. So the current confluent kafka protocol uses it for the tests.

.github/workflows/integration.yaml Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/message.go Show resolved Hide resolved
protocol/kafka_confluent/v2/message.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/message_test.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/message_test.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/option.go Show resolved Hide resolved
protocol/kafka_confluent/v2/option.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/option.go Outdated Show resolved Hide resolved
samples/kafka_confluent/go.mod Show resolved Hide resolved
protocol/kafka_confluent/v2/message.go Show resolved Hide resolved
@embano1
Copy link
Member

embano1 commented Mar 25, 2024

I have a slight concern, perhaps unnecessary and we can just ignore it. That is what if the producer isn't closed by the Closer, that will cause producerDeliveryChan not to be closed?

Please add a comment to the Close(ctx) method that protocol users must call Close(ctx) to properly clean up resources after use. Also, add defer p.Close(ctx) to all your examples

@yanmxa
Copy link
Contributor Author

yanmxa commented Mar 25, 2024

I have a slight concern, perhaps unnecessary and we can just ignore it. That is what if the producer isn't closed by the Closer, that will cause producerDeliveryChan not to be closed?

Please add a comment to the Close(ctx) method that protocol users must call Close(ctx) to properly clean up resources after use. Also, add defer p.Close(ctx) to all your examples

Done!

@embano1
Copy link
Member

embano1 commented Mar 25, 2024

Please add a doc string to the Close() method as well, advising users to call Close() after use. Please directly squash your commits so we can merge. Also, any related documentation/README updates needed to further highlight this new protocol implementation?

@embano1
Copy link
Member

embano1 commented Mar 25, 2024

Also, why did you not use WithPollGoroutines in all receiver examples? Should be clear why this is advised e.g., also adding a comment to the example explaining that this won't have perf impact and is only required if the user cares about in-order processing when using the SDK.

@yanmxa
Copy link
Contributor Author

yanmxa commented Mar 25, 2024

Also, why did you not use WithPollGoroutines in all receiver examples? Should be clear why this is advised e.g., also adding a comment to the example explaining that this won't have perf impact and is only required if the user cares about in-order processing when using the SDK.

@embano1 Thanks for your prompt feedback!

  1. Add some comments for the Close method and WithPollGoroutines in all receiver examples.
  2. Update the related document in this repository.
  3. Squash the commits

Could you please help me to check if the wording in the above document is appropriate?

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just that one comment please

@embano1
Copy link
Member

embano1 commented Mar 25, 2024

Can you please check your integration tests? They're not looking positive, but passing: https://github.com/cloudevents/sdk-go/actions/runs/8420863202/job/23056453953?pr=988#step:5:284 (example)

@embano1
Copy link
Member

embano1 commented Mar 25, 2024

@yanmxa yanmxa force-pushed the br_kafka_confluent branch 3 times, most recently from 0ac67a3 to 20a4a89 Compare March 26, 2024 02:53
@yanmxa
Copy link
Contributor Author

yanmxa commented Mar 26, 2024

This unit test is also not looking ok: https://github.com/cloudevents/sdk-go/actions/runs/8420863203/job/23056453818#step:4:3883

The error log disappears in the unit test now.

@yanmxa
Copy link
Contributor Author

yanmxa commented Mar 26, 2024

Can you please check your integration tests? They're not looking positive, but passing: https://github.com/cloudevents/sdk-go/actions/runs/8420863202/job/23056453953?pr=988#step:5:284 (example)

Done!
The integration test error log is from here.

Signed-off-by: myan <myan@redhat.com>

add message implementation

Signed-off-by: myan <myan@redhat.com>

add ut test

Signed-off-by: myan <myan@redhat.com>

add integration test

Signed-off-by: myan <myan@redhat.com>

add integration test and samples

Signed-off-by: Meng Yan <myan@redhat.com>

offset

Signed-off-by: Meng Yan <myan@redhat.com>

remove the ctx

Signed-off-by: myan <myan@redhat.com>

 review

Signed-off-by: Meng Yan <myan@redhat.com>

remove

Signed-off-by: Meng Yan <myan@redhat.com>

init consumer and producer on 1 client

Signed-off-by: myan <myan@redhat.com>

reply the reviews

Signed-off-by: myan <myan@redhat.com>

fix the ci

Signed-off-by: myan <myan@redhat.com>

ci fix

Signed-off-by: Meng Yan <myan@redhat.com>

add confluent test in github action

Signed-off-by: Meng Yan <myan@redhat.com>

add the mis-used test case

Signed-off-by: Meng Yan <myan@redhat.com>

remove the invalidated bootstrapserver

Signed-off-by: Meng Yan <myan@redhat.com>

log kafka error message

Signed-off-by: myan <myan@redhat.com>

review

Signed-off-by: myan <myan@redhat.com>

Update protocol/kafka_confluent/v2/option.go

Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com>
Signed-off-by: myan <myan@redhat.com>

add the auto recover option

Signed-off-by: myan <myan@redhat.com>

kafka error handler

Signed-off-by: myan <myan@redhat.com>

remove the delievery chan

Signed-off-by: myan <myan@redhat.com>

Update protocol/kafka_confluent/v2/protocol.go

Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com>

Update protocol/kafka_confluent/v2/protocol.go

Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com>

Update protocol/kafka_confluent/v2/option.go

Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com>

Update protocol/kafka_confluent/v2/protocol.go

Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com>

reply review

Signed-off-by: myan <myan@redhat.com>

modify the git action

Signed-off-by: Meng Yan <myan@redhat.com>

reply review

Signed-off-by: Meng Yan <myan@redhat.com>

reply review

Signed-off-by: Meng Yan <myan@redhat.com>

handle race condition between sender and closer

Signed-off-by: Meng Yan <myan@redhat.com>

reply review

Signed-off-by: Meng Yan <myan@redhat.com>

reply review1

Signed-off-by: Meng Yan <myan@redhat.com>

add defer close

Signed-off-by: Meng Yan <myan@redhat.com>

add comment

Signed-off-by: Meng Yan <myan@redhat.com>
Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

THX @yanmxa for getting this over the line!

cc/ @duglin for final overall check

@embano1
Copy link
Member

embano1 commented Mar 29, 2024

@duglin can you please take a final look just overall whether we missed a README or general requirements for a new protocol binding?

@duglin
Copy link
Contributor

duglin commented Mar 29, 2024

Sorry, been swamped. I'll try to give it a look today

@duglin
Copy link
Contributor

duglin commented Mar 29, 2024

@lionelvillard too :-)

@@ -27,6 +27,15 @@ jobs:
- 9091:9091
- 9092:9092

kafka_confluent:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the one above from "kafka" to "kafka_sarama" to be consistent with the rest of the naming?

@duglin
Copy link
Contributor

duglin commented Mar 29, 2024

LGTM

I'll let @embano1 hit the button.

@embano1 embano1 merged commit e6a74ef into cloudevents:main Mar 29, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

confluent kafka protocol binding
4 participants