Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support initialisation of producer without topic for kafka, azure event hub and confluent cloud #2569

Merged
merged 28 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fcaf5a1
feat: initial draft to support initialisation of producer without topic
utsabc Oct 14, 2022
9441ece
feat: added support to publish with topic info
utsabc Oct 14, 2022
d4bf244
feat: added support to publish with topic for batched paylaods and mi…
utsabc Oct 14, 2022
e9b0eca
feat: addresed review comments
utsabc Oct 19, 2022
5ea13e8
feat: minor refactoring
utsabc Oct 19, 2022
c34af41
feat: updated client_test.go
utsabc Oct 19, 2022
3ae40f7
feat(kafka multi topic): allowing topic to be passed only in produce …
utsabc Oct 24, 2022
98b80bd
feat(kafka multi topic): refactoring function name
utsabc Oct 24, 2022
cf618c7
feat(kafka multi topic): removed vulnerablites from calling property …
utsabc Oct 24, 2022
57f2257
feat(integrations): removed unwanted args from producer funcs
utsabc Oct 26, 2022
e2d4428
feat(kafka multi topic): refactored test cases for new implementation
utsabc Oct 26, 2022
b28df76
feat(kafka multi topic support): minor added some changes in tests
utsabc Oct 28, 2022
476c1b3
Merge branch 'master' into feat.kafka-multi-topic-support
utsabc Oct 29, 2022
709a66d
feat(kafka multi topic support): addressed review comment and updated…
utsabc Oct 29, 2022
8664e2a
Merge branch 'feat.kafka-multi-topic-support' of github.com:rudderlab…
utsabc Oct 29, 2022
eef6bd4
feat(kafka multi topic support): resolving conflicts
utsabc Nov 2, 2022
c8fb9cd
feat(kafka multi topic support): clean up and minor fixes
utsabc Nov 2, 2022
b0c22d1
Merge branch 'master' into feat.kafka-multi-topic-support
utsabc Nov 2, 2022
78f24d8
Merge branch 'master' into feat.kafka-multi-topic-support
utsabc Nov 3, 2022
f6acf0f
Merge branch 'master' into feat.kafka-multi-topic-support
krishna2020 Nov 4, 2022
91abb1a
feat: addressing comments on code refactoring
utsabc Nov 5, 2022
7fe1e6a
feat: addressing comments on code refactoring adding backward compati…
utsabc Nov 5, 2022
f66e8e9
feat: re-adding the sanity test cases
utsabc Nov 5, 2022
aa9d7e2
Merge branch 'master' into feat.kafka-multi-topic-support
utsabc Nov 5, 2022
560fa4e
feat: adding default-topic-test case
utsabc Nov 7, 2022
4504a55
feat: adding default-topic-test case for batch
utsabc Nov 7, 2022
9f7e642
Merge branch 'master' into feat.kafka-multi-topic-support
utsabc Nov 7, 2022
eeb8667
feat: clean up of test cases
utsabc Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions services/streammanager/kafka/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestProducerBatchConsumerGroup(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(t.Name(), producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, p, noOfMessages)
messagesWaitGroup.Add(noOfMessages)
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestConsumer_Partition(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(t.Name(), producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, p, noOfMessages)
messagesWaitGroup.Add(noOfMessages)
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestWithSASL(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer("some-topic", producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -418,6 +418,7 @@ func TestWithSASL(t *testing.T) {
err := p.Publish(context.Background(), Message{
Key: []byte("hello"),
Value: []byte("ciao"),
Topic: "some-topic",
})
if err != nil {
t.Logf("Publish error: %v", err)
Expand Down Expand Up @@ -520,7 +521,7 @@ func TestProducer_Timeout(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(t.Name(), producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -534,6 +535,7 @@ func TestProducer_Timeout(t *testing.T) {
err = p.Publish(pubCtx, Message{
Key: []byte("hello"),
Value: []byte("world"),
Topic: t.Name(),
})
pubCancel()
require.NoError(t, err)
Expand All @@ -542,6 +544,7 @@ func TestProducer_Timeout(t *testing.T) {
err = p.Publish(pubCtx, Message{
Key: []byte("hello"),
Value: []byte("world"),
Topic: t.Name(),
})
defer pubCancel()
require.Error(t, err)
Expand Down Expand Up @@ -588,7 +591,7 @@ func TestIsProducerErrTemporary(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer("non-existent-topic", producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -604,9 +607,11 @@ func TestIsProducerErrTemporary(t *testing.T) {
err = p.Publish(pubCtx, Message{
Key: []byte("key-01"),
Value: []byte("value-01"),
Topic: "non-existent-topic",
}, Message{
Key: []byte("key-02"),
Value: []byte("value-02"),
Topic: "non-existent-topic",
})
require.Truef(t, IsProducerErrTemporary(err), "Expected temporary error, got %v instead", err)
pubCancel()
Expand Down Expand Up @@ -645,13 +650,13 @@ func TestConfluentAzureCloud(t *testing.T) {
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(
t.Name(), // the topic needs to be created beforehand via the ConfluentCloud admin panel
producerConf,
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01")})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// the topic needs to be created beforehand via the ConfluentCloud admin panel
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01"), Topic: t.Name()})
cancel()
require.NoError(t, err)

Expand Down Expand Up @@ -689,11 +694,12 @@ func TestAzureEventHubsCloud(t *testing.T) {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(azureEventHubName, producerConf)
p, err := c.NewProducer(producerConf)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01")})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = p.Publish(ctx, Message{Key: []byte("key-01"), Value: []byte("value-01"), Topic: azureEventHubName})

cancel()
require.NoError(t, err)

Expand Down Expand Up @@ -722,6 +728,7 @@ func publishMessages(ctx context.Context, t *testing.T, p *Producer, noOfMessage
messages[i] = Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
Topic: t.Name(),
}
}

Expand Down
29 changes: 18 additions & 11 deletions services/streammanager/kafka/client/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Producer struct {
}

// NewProducer instantiates a new producer. To use it asynchronously just do "go p.Publish(ctx, msgs)".
func (c *Client) NewProducer(topic string, producerConf ProducerConfig) (p *Producer, err error) { // skipcq: CRT-P0003
func (c *Client) NewProducer(producerConf ProducerConfig) (p *Producer, err error) { // skipcq: CRT-P0003
producerConf.defaults()

dialer := &net.Dialer{
Expand Down Expand Up @@ -68,7 +68,6 @@ func (c *Client) NewProducer(topic string, producerConf ProducerConfig) (p *Prod
config: producerConf,
writer: &kafka.Writer{
Addr: kafka.TCP(c.addresses...),
Topic: topic,
Balancer: &kafka.ReferenceHash{},
BatchTimeout: time.Nanosecond,
WriteTimeout: producerConf.WriteTimeout,
Expand Down Expand Up @@ -109,17 +108,12 @@ func (p *Producer) Close(ctx context.Context) error {
func (p *Producer) Publish(ctx context.Context, msgs ...Message) error {
messages := make([]kafka.Message, len(msgs))
for i := range msgs {
var headers []kafka.Header
if l := len(msgs[i].Headers); l > 0 {
headers = make([]kafka.Header, l)
for k := range msgs[i].Headers {
headers[k] = kafka.Header{
Key: msgs[i].Headers[k].Key,
Value: msgs[i].Headers[k].Value,
}
}
if msgs[i].Topic == "" {
return fmt.Errorf("no topic provided for message %d", i)
atzoum marked this conversation as resolved.
Show resolved Hide resolved
}
headers := headers(msgs[i])
messages[i] = kafka.Message{
Topic: msgs[i].Topic,
Copy link
Contributor

@atzoum atzoum Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.Producer should know if it has been initialized with a topic or not and only include the message's topic in the latter case.
Additionally, this function should return an error if the producer is not initialized with a topic and no topic is included in at least one of the messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to go with the approach where we always initialize the producer without client and we intend to always send each message with topic information. That proper assignment of topic to each message would be handled at transformer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we decided to go down this road, can you please make it mandatory and raise an error here if the topic is empty?

	for i := range msgs {
		if msgs[i].Topic == "" {
			return fmt.Errorf("no topic provided for message %d", i)
		}

I think we should also have a test for this (don't specify topic and check that an error is returned).

Key: msgs[i].Key,
Value: msgs[i].Value,
Time: msgs[i].Timestamp,
Expand All @@ -132,6 +126,19 @@ func (p *Producer) Publish(ctx context.Context, msgs ...Message) error {

var tempError interface{ Temporary() bool }

func headers(msg Message) (headers []kafka.Header) {
if l := len(msg.Headers); l > 0 {
headers = make([]kafka.Header, l)
for k := range msg.Headers {
headers[k] = kafka.Header{
Key: msg.Headers[k].Key,
Value: msg.Headers[k].Value,
}
}
}
return headers
}

func isErrTemporary(err error) bool {
isTransientNetworkError := errors.Is(err, io.ErrUnexpectedEOF) ||
errors.Is(err, syscall.ECONNREFUSED) ||
Expand Down
37 changes: 25 additions & 12 deletions services/streammanager/kafka/kafkamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*Produ
return nil, fmt.Errorf("could not ping: %w", err)
}

p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{
p, err := c.NewProducer(client.ProducerConfig{
ReadTimeout: kafkaReadTimeout,
WriteTimeout: kafkaWriteTimeout,
})
Expand Down Expand Up @@ -363,7 +363,7 @@ func NewProducerForAzureEventHubs(destination *backendconfig.DestinationT, o com
return nil, fmt.Errorf("[Azure Event Hubs] Cannot connect: %w", err)
}

p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{
p, err := c.NewProducer(client.ProducerConfig{
ReadTimeout: kafkaReadTimeout,
WriteTimeout: kafkaWriteTimeout,
})
Expand Down Expand Up @@ -415,7 +415,7 @@ func NewProducerForConfluentCloud(destination *backendconfig.DestinationT, o com
return nil, fmt.Errorf("[Confluent Cloud] Cannot connect: %w", err)
}

p, err := c.NewProducer(destConfig.Topic, client.ProducerConfig{
p, err := c.NewProducer(client.ProducerConfig{
ReadTimeout: kafkaReadTimeout,
WriteTimeout: kafkaWriteTimeout,
})
Expand Down Expand Up @@ -450,14 +450,19 @@ func serializeAvroMessage(value []byte, codec goavro.Codec) ([]byte, error) {
return binary, nil
}

func prepareBatchOfMessages(topic string, batch []map[string]interface{}, timestamp time.Time, p producerManager) (
func prepareBatchOfMessages(batch []map[string]interface{}, timestamp time.Time, p producerManager, defaultTopic string) (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing the defaultTopic around, an alternative approach is to use the existing pattern of including such information in the producerManager:

type producerManager interface {
	io.Closer
	publisher
	getDefaultTopic() string
	getTimeout() time.Duration
	getCodecs() map[string]*goavro.Codec
}

That should be preferred given that it is already in place. I've attached a git diff for you to consider this approach (e.g. git apply yourcoworkers.diff).

kafka.diff.zip

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we take this up in the next iteration of enhancement I can create a backlog item in our board for this. The reason I am asking is I need to update and add some test cases for this approach as we are dangerously close to release this might take more time :/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure no problem 👍

[]client.Message, error,
) {
start := now()
defer func() { kafkaStats.prepareBatchTime.SendTiming(since(start)) }()

var messages []client.Message

for i, data := range batch {
topic, ok := data["topic"].(string)
if !ok || topic == "" {
topic = defaultTopic
atzoum marked this conversation as resolved.
Show resolved Hide resolved
}

message, ok := data["message"]
if !ok {
kafkaStats.missingMessage.Increment()
Expand Down Expand Up @@ -533,7 +538,6 @@ func (p *ProducerManager) Produce(jsonData json.RawMessage, destConfig interface
// return 400 if producer is invalid
return 400, "Could not create producer", "Could not create producer"
}

start := now()
defer func() { kafkaStats.produceTime.SendTiming(since(start)) }()

Expand All @@ -556,18 +560,19 @@ func (p *ProducerManager) Produce(jsonData json.RawMessage, destConfig interface
if kafkaBatchingEnabled {
return sendBatchedMessage(ctx, jsonData, p, conf.Topic)
}

return sendMessage(ctx, jsonData, p, conf.Topic)
}

func sendBatchedMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, topic string) (int, string, string) {
func sendBatchedMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, defaultTopic string) (int, string, string) {
var batch []map[string]interface{}
err := json.Unmarshal(jsonData, &batch)
if err != nil {
return 400, "Failure", "Error while unmarshalling json data: " + err.Error()
}

timestamp := time.Now()
batchOfMessages, err := prepareBatchOfMessages(topic, batch, timestamp, p)
batchOfMessages, err := prepareBatchOfMessages(batch, timestamp, p, defaultTopic)
if err != nil {
return 400, "Failure", "Error while preparing batched message: " + err.Error()
}
Expand All @@ -581,7 +586,7 @@ func sendBatchedMessage(ctx context.Context, jsonData json.RawMessage, p produce
return 200, returnMessage, returnMessage
}

func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, topic string) (int, string, string) {
func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManager, defaultTopic string) (int, string, string) {
parsedJSON := gjson.ParseBytes(jsonData)
messageValue := parsedJSON.Get("message").Value()
if messageValue == nil {
Expand All @@ -594,11 +599,11 @@ func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManage
}

timestamp := time.Now()
userID, _ := parsedJSON.Get("userId").Value().(string)
userID := parsedJSON.Get("userId").String()
codecs := p.getCodecs()
if len(codecs) > 0 {
schemaId, _ := parsedJSON.Get("schemaId").Value().(string)
messageId, _ := parsedJSON.Get("message.messageId").Value().(string)
schemaId := parsedJSON.Get("schemaId").String()
messageId := parsedJSON.Get("message.messageId").String()
koladilip marked this conversation as resolved.
Show resolved Hide resolved
if schemaId == "" {
return makeErrorResponse(fmt.Errorf("schemaId is not available for event with messageId: %s", messageId))
}
Expand All @@ -611,7 +616,15 @@ func sendMessage(ctx context.Context, jsonData json.RawMessage, p producerManage
return makeErrorResponse(fmt.Errorf("unable to serialize event with messageId: %s, with error %s", messageId, err))
}
}

topic := parsedJSON.Get("topic").String()

if topic == "" {
topic = defaultTopic
}

message := prepareMessage(topic, userID, value, timestamp)

if err = publish(ctx, p, message); err != nil {
return makeErrorResponse(fmt.Errorf("could not publish to %q: %w", topic, err))
}
Expand Down
Loading