Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

new: Add SendBroadcastMessage() bus method #41

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type EventBus interface {
GetChannelManager() ChannelManager
SendRequestMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error
SendResponseMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error
SendBroadcastMessage(channelName string, payload interface{}) error
SendErrorMessage(channelName string, err error, destinationId *uuid.UUID) error
ListenStream(channelName string) (MessageHandler, error)
ListenStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
Expand Down Expand Up @@ -186,6 +187,19 @@ func (bus *transportEventBus) SendResponseMessage(channelName string, payload in
return nil
}

// SendBroadcastMessage sends the payload as an outbound broadcast message to channelName. Since it is a broadcast,
// the payload does not require a destination ID. Throws an error if the channel does not exist.
func (bus *transportEventBus) SendBroadcastMessage(channelName string, payload interface{}) error {
channelObject, err := bus.ChannelManager.GetChannel(channelName)
if err != nil {
return err
}
config := buildConfig(channelName, payload, nil)
message := model.GenerateResponse(config)
sendMessageToChannel(channelObject, message)
return nil
}

// SendRequestMessage Send a RequestDir type message (outbound) message on Channel, with supplied Payload.
// Throws error if the Channel does not exist.
func (bus *transportEventBus) SendRequestMessage(channelName string, payload interface{}, destId *uuid.UUID) error {
Expand Down
37 changes: 33 additions & 4 deletions bus/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func TestEventBus_SendRequestMessageNoChannel(t *testing.T) {
assert.NotNil(t, err)
}

func TestTransportEventBus_SendBroadcastMessageNoChannel(t *testing.T) {
err := evtBusTest.SendBroadcastMessage("Channel-not-here", "hello melody")
assert.NotNil(t, err)
}

func TestEventBus_ListenStream(t *testing.T) {
createTestChannel()
handler, err := evtBusTest.ListenStream(evtbusTestChannelName)
Expand All @@ -112,6 +117,30 @@ func TestEventBus_ListenStream(t *testing.T) {
destroyTestChannel()
}

func TestTransportEventBus_ListenStreamForBroadcast(t *testing.T) {
createTestChannel()
handler, err := evtBusTest.ListenStream(evtbusTestChannelName)
assert.Nil(t, err)
assert.NotNil(t, handler)
var count int32 = 0
handler.Handle(
func(msg *model.Message) {
assert.Equal(t, "hello melody", msg.Payload.(string))
inc(&count)
},
func(err error) {})

for i := 0; i < 3; i++ {
evtBusTest.SendBroadcastMessage(evtbusTestChannelName, "hello melody")

// send requests to make sure we're only getting requests
evtBusTest.SendRequestMessage(evtbusTestChannelName, 1, nil)
}
evtbusTestManager.WaitForChannel(evtbusTestChannelName)
assert.Equal(t, int32(3), count)
destroyTestChannel()
}

func TestTransportEventBus_ListenStreamForDestination(t *testing.T) {
createTestChannel()
id := uuid.New()
Expand Down Expand Up @@ -596,11 +625,11 @@ func TestChannelManager_TestConnectBroker(t *testing.T) {

// connect to broker
cf := &bridge.BrokerConnectorConfig{
Username: "test",
Password: "test",
UseWS: true,
Username: "test",
Password: "test",
UseWS: true,
WebSocketConfig: &bridge.WebSocketConfig{
WSPath: "/",
WSPath: "/",
},
ServerAddr: "broker-url"}

Expand Down