Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in MQTT protocol when sending messages #1094

Merged
merged 1 commit into from
Sep 11, 2024

Conversation

yanmxa
Copy link
Contributor

@yanmxa yanmxa commented Sep 4, 2024

Signed-off-by: myan myan@redhat.com

Resolved: #1093

Summary:

Reason: This issue occurs when a single client sends events across multiple goroutines. Specifically, in the code here:

msg := p.publishOption
if cecontext.TopicFrom(ctx) != "" {
msg.Topic = cecontext.TopicFrom(ctx)
cecontext.WithTopic(ctx, "")
}
err = WritePubMessage(ctx, m, msg, transformers...)
if err != nil {
return err
}

at line 98, the MQTT msg is designed to hold a single binding.Message (or Event) payload. However, in a multi-goroutine environment, multiple m values may be written to the shared msg, causing a panic like bytes.Buffer: too large.

Solution: Instead of sharing the same msg across goroutines, we will create a copy of it each time a message is sent.

@yanmxa yanmxa requested a review from a team as a code owner September 4, 2024 11:59
@yanmxa yanmxa changed the title Fix race condition of MQTT protocol when sending messages with the Fix race condition in MQTT protocol when sending messages Sep 4, 2024
@yanmxa
Copy link
Contributor Author

yanmxa commented Sep 4, 2024

/cc @embano1

Comment on lines +109 to +117
// publishMsg generate a new paho.Publish message from the p.publishOption
func (p *Protocol) publishMsg() *paho.Publish {
return &paho.Publish{
QoS: p.publishOption.QoS,
Retain: p.publishOption.Retain,
Topic: p.publishOption.Topic,
Properties: p.publishOption.Properties,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly, the race condition is that the publishOption can be changed before the message is published?

Since there is only one place to set this field in the struct (

func WithPublish(publishOpt *paho.Publish) Option {
return func(p *Protocol) error {
if publishOpt == nil {
return fmt.Errorf("the paho.Publish option must not be nil")
}
p.publishOption = publishOpt
return nil
}
}
), maybe it would make sense to make the copy there instead of every time a message is published?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Left a comment in the issue to better understand the issue/root cause. What is causing the race? Why is publishOption changing during concurrent calls? Asking all these questions because I didn't write the implementation and not familiar with MQ. Furthermore, if we change as per the above, I wonder what the implication is for existing users because we're changing the semantics here (copy instead of pointer).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Cali0707! The code block you referred to is only invoked once when initializing the client. The issues arose when sending messages in many goroutines. Here is the detail

Copy link
Contributor Author

@yanmxa yanmxa Sep 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @embano1.

I append the root cause here.

The publishOption is a pointer reference by the MQTT message, which is used to hold the CloudEvent payload. The message(publishOption) is shared across goroutines and will be changed by them.

The publishOption is a pointer referenced by the MQTT message, which holds the CloudEvent payload. So the message (publishOption) is shared across multiple goroutines and can be modified by them.

By making the suggested changes, the user should not perceive anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and will be changed by them

Can you please point me to the code/line where this happens?
Also, ideally you also write a test please to surface the current issue (panics) and how this PR fixes it, perhaps there's more issues due to the current implementation which we can detect this way 🙏

Copy link
Contributor Author

@yanmxa yanmxa Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@embano1 No worries!

Here’s the code line where the panic occurs.

I’ve also written an integration test that covers the concurrent panic issue, which you can use to reproduce it.

protocol/mqtt_paho/v2/protocol.go Show resolved Hide resolved
Comment on lines +109 to +117
// publishMsg generate a new paho.Publish message from the p.publishOption
func (p *Protocol) publishMsg() *paho.Publish {
return &paho.Publish{
QoS: p.publishOption.QoS,
Retain: p.publishOption.Retain,
Topic: p.publishOption.Topic,
Properties: p.publishOption.Properties,
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Left a comment in the issue to better understand the issue/root cause. What is causing the race? Why is publishOption changing during concurrent calls? Asking all these questions because I didn't write the implementation and not familiar with MQ. Furthermore, if we change as per the above, I wonder what the implication is for existing users because we're changing the semantics here (copy instead of pointer).

test/integration/mqtt_paho/concurrent_test.go Outdated Show resolved Hide resolved
test/integration/mqtt_paho/concurrent_test.go Outdated Show resolved Hide resolved
test/integration/mqtt_paho/concurrent_test.go Outdated Show resolved Hide resolved
@embano1
Copy link
Member

embano1 commented Sep 11, 2024

Great, nice work! Please squash your commits and force-push.

Signed-off-by: myan <myan@redhat.com>

add sending concurrently

Signed-off-by: myan <myan@redhat.com>

add err group

Signed-off-by: myan <myan@redhat.com>
@yanmxa
Copy link
Contributor Author

yanmxa commented Sep 11, 2024

Great, nice work! Please squash your commits and force-push.

Done.

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@embano1 embano1 merged commit 682f3a9 into cloudevents:main Sep 11, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Race condition when sending messages with the MQTT protocol
3 participants