Skip to content

Commit

Permalink
add sending concurrently
Browse files Browse the repository at this point in the history
Signed-off-by: myan <myan@redhat.com>
  • Loading branch information
yanmxa committed Sep 10, 2024
1 parent 460cc6d commit 7461156
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions test/integration/mqtt_paho/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package mqtt_paho

import (
"context"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)

func TestConcurrentSendingEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

topicName := "test-ce-client-" + uuid.New().String()

readyCh := make(chan bool)
defer close(readyCh)

senderNum := 10 // 10 gorutine to sending the events
eventNum := 1000 // each gorutine sender publishs 1,000 events
count := senderNum * eventNum // sending 10,000 events concurrenly and verify all of them can be recieved

// start a receiver
c, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)
go func() {
var mu sync.Mutex
err = c.StartReceiver(ctx, func(event cloudevents.Event) {
mu.Lock()
count--
mu.Unlock()

// all the events has been received
if count == 0 {
readyCh <- true
}
})
require.NoError(t, err)
}()
// wait for 5 seconds to ensure the receiver starts safely
time.Sleep(5 * time.Second)

// start a sender client to pulish events concurrently
client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)

evt := cloudevents.NewEvent()
evt.SetType("com.cloudevents.sample.sent")
evt.SetSource("concurrent-sender")
err = evt.SetData(cloudevents.ApplicationJSON, map[string]interface{}{"message": "Hello, World!"})
require.NoError(t, err)

for i := 0; i < senderNum; i++ {
go func() {
for j := 0; j < eventNum; j++ {
result := client.Send(
cecontext.WithTopic(ctx, topicName),
evt,
)
require.NoError(t, result)
}
}()
}

// wait until all the events are received
for {
select {
case <-ctx.Done():
require.Fail(t, "Test failed: timeout reached before events were received")
return
case <-readyCh:
cancel()
t.Logf("Test passed: events successfully received")
return
}
}
}

0 comments on commit 7461156

Please sign in to comment.