Skip to content

Commit

Permalink
add err group
Browse files Browse the repository at this point in the history
Signed-off-by: myan <myan@redhat.com>
  • Loading branch information
yanmxa committed Sep 10, 2024
1 parent 787a693 commit a67eb86
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions test/integration/mqtt_paho/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
Expand All @@ -30,24 +31,24 @@ func TestConcurrentSendingEvent(t *testing.T) {
senderNum := 10 // 10 gorutine to sending the events
eventNum := 1000 // each gorutine sender publishs 1,000 events

var g errgroup.Group

// start a receiver
c, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)
go func() {
// sending 10,000 events concurrenly and verify all of them can be recieved
g.Go(func() error {
// verify all of events can be recieved
count := senderNum * eventNum
var mu sync.Mutex
err = c.StartReceiver(ctx, func(event cloudevents.Event) {
return c.StartReceiver(ctx, func(event cloudevents.Event) {
mu.Lock()
defer mu.Unlock()
count--
// all the events has been received
if count == 0 {
readyCh <- true
}
})
require.NoError(t, err)
}()
})
// wait for 5 seconds to ensure the receiver starts safely
time.Sleep(5 * time.Second)

Expand All @@ -62,18 +63,27 @@ func TestConcurrentSendingEvent(t *testing.T) {
require.NoError(t, err)

for i := 0; i < senderNum; i++ {
go func() {
g.Go(func() error {
for j := 0; j < eventNum; j++ {
result := client.Send(
cecontext.WithTopic(ctx, topicName),
evt,
)
require.NoError(t, result)
if result != nil {
return result
}
}
}()
return nil
})
}

// wait until all the events are received
handleEvent(ctx, readyCh, cancel, t)

require.NoError(t, g.Wait())
}

func handleEvent(ctx context.Context, readyCh <-chan bool, cancel context.CancelFunc, t *testing.T) {
for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit a67eb86

Please sign in to comment.