-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support initialisation of producer without topic for kafka, azure event hub and confluent cloud #2569
Conversation
Codecov ReportBase: 43.73% // Head: 43.71% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #2569 +/- ##
==========================================
- Coverage 43.73% 43.71% -0.02%
==========================================
Files 191 191
Lines 40483 40499 +16
==========================================
- Hits 17707 17706 -1
- Misses 21672 21685 +13
- Partials 1104 1108 +4
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some test cases for all the new scenarios introduced for client.Producer
:
- Initialize a producer with a topic and send a message containing a topic, the message's topic shall be ignored
- Initialize a producer without a topic and try to send a message containing no topic, an error should be returned
Furthermore, we need test scenarios for kafkamanager as well covering
- Reading the topic from the message
- Falling back to the default topic if no topic is provided by the message
- Handling invalid topic fields (e.g. number, object, nil)
@@ -132,6 +124,19 @@ func (p *Producer) Publish(ctx context.Context, msgs ...Message) error { | |||
|
|||
var tempError interface{ Temporary() bool } | |||
|
|||
func processHeaderForEachMessage(msg Message) (headers []kafka.Header) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
func processHeaderForEachMessage(msg Message) (headers []kafka.Header) { | |
func headers(msg Message) (headers []kafka.Header) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is refactored and addressed
} | ||
} | ||
} | ||
headers := processHeaderForEachMessage(msgs[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
headers := processHeaderForEachMessage(msgs[i]) | |
headers := headers(msgs[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is refactored and addressed
messages[i] = kafka.Message{ | ||
Topic: msgs[i].Topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client.Producer
should know if it has been initialized with a topic or not and only include the message's topic in the latter case.
Additionally, this function should return an error if the producer is not initialized with a topic and no topic is included in at least one of the messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We plan to go with the approach where we always initialize the producer without client and we intend to always send each message with topic information. That proper assignment of topic to each message would be handled at transformer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we decided to go down this road, can you please make it mandatory and raise an error here if the topic is empty?
for i := range msgs {
if msgs[i].Topic == "" {
return fmt.Errorf("no topic provided for message %d", i)
}
I think we should also have a test for this (don't specify topic and check that an error is returned).
for i, data := range batch { | ||
if conf.MultiTopicSupport { | ||
topic = data["topic"].(string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we fallback to the default topic if the message doesn't contain one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is handled with transformer being the responsible service for assigning to topic to each message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opensource users don't always update their transformers, do they? Plus I don't feel safe with this decision (have commented about it elswhere)
@@ -611,7 +617,11 @@ func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManage | |||
return makeErrorResponse(fmt.Errorf("unable to serialize event with messageId: %s, with error %s", messageId, err)) | |||
} | |||
} | |||
if config.MultiTopicSupport { | |||
topic = parsedJSON.Get("topic").Value().(string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type cast will panic here if the field is absent or not a string. Additionally, shouldn't we fallback to the default topic if the message doesn't contain one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled similar vulnerabilities
@@ -312,7 +313,7 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*Produ | |||
return nil, fmt.Errorf("could not ping: %w", err) | |||
} | |||
|
|||
p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{ | |||
p, err := c.NewProducer("", client.ProducerConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is how you want to handle topics then why keeping the argument in the constructor? It looks like you'll be passing it each time anyway even for old configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we are planning the same approach updated for the same
To simplify a bit I'd remove the |
…removing client initialisation with topic
…values from message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like tests are failing, check once, also make sure to add unit test cases for the code you have modified
…bility support for default topic
@utsabc if this is actually the final product, can you please take care of the PR title and description accordingly? |
Addressed |
@@ -450,14 +450,19 @@ func serializeAvroMessage(value []byte, codec goavro.Codec) ([]byte, error) { | |||
return binary, nil | |||
} | |||
|
|||
func prepareBatchOfMessages(topic string, batch []map[string]interface{}, timestamp time.Time, p producerManager) ( | |||
func prepareBatchOfMessages(batch []map[string]interface{}, timestamp time.Time, p producerManager, defaultTopic string) ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing the defaultTopic
around, an alternative approach is to use the existing pattern of including such information in the producerManager
:
type producerManager interface {
io.Closer
publisher
getDefaultTopic() string
getTimeout() time.Duration
getCodecs() map[string]*goavro.Codec
}
That should be preferred given that it is already in place. I've attached a git diff for you to consider this approach (e.g. git apply yourcoworkers.diff
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we take this up in the next iteration of enhancement I can create a backlog item in our board for this. The reason I am asking is I need to update and add some test cases for this approach as we are dangerously close to release this might take more time :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure no problem 👍
Description
The scope of this PR is to support KAFKA, Azure event hub and confluent cloud producer with multiple topic from a single client. We want to generate the kafka client without the context of producer where in we will be passing topic information at individual message level.
Transformer takes the responsibility of allocating correct topic information to messages
To support backward compatibility the server client falls back to default topic present in destination config if the message level topic is not present.
Also from transformer to support fair event ordering for topics the logic that is added are as follows
events --> server --> transformer (transform each message) ---> server ---> client --> delivery
[rudderId = anonymousId <<>>userID ] [rudderId = anonymousId<<>>userId<<>>topic]
Notion Ticket
Notion Ticket
Security