Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Fix UUID equality check bug #45

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions bridge/broker_connector_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ import (
)

type WebSocketConfig struct {
WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric')
UseTLS bool // use TLS encryption with WebSocket connection
WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric')
UseTLS bool // use TLS encryption with WebSocket connection
TLSConfig *tls.Config // TLS config for WebSocket connection
CertFile string // X509 certificate for TLS
KeyFile string // matching key file for the X509 certificate
CertFile string // X509 certificate for TLS
KeyFile string // matching key file for the X509 certificate
}

// BrokerConnectorConfig is a configuration used when connecting to a message broker
type BrokerConnectorConfig struct {
Username string
Password string
ServerAddr string
UseWS bool // use WebSocket instead of TCP
WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true
HostHeader string
HeartBeatOut time.Duration // outbound heartbeat interval (from client to server)
HeartBeatIn time.Duration // inbound heartbeat interval (from server to client)
STOMPHeader map[string]string // additional STOMP headers for handshake
HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade
Username string
Password string
ServerAddr string
UseWS bool // use WebSocket instead of TCP
WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true
HostHeader string
HeartBeatOut time.Duration // outbound heartbeat interval (from client to server)
HeartBeatIn time.Duration // inbound heartbeat interval (from server to client)
STOMPHeader map[string]string // additional STOMP headers for handshake
HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade
}

// LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes
Expand All @@ -46,4 +46,4 @@ func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) err
}
}
return err
}
}
11 changes: 6 additions & 5 deletions bridge/broker_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"bufio"
"bytes"
"fmt"
"github.com/go-stomp/stomp/v3/frame"
"github.com/go-stomp/stomp/v3/server"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"log"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/go-stomp/stomp/v3/frame"
"github.com/go-stomp/stomp/v3/server"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
)

var upgrader = websocket.Upgrader{}
Expand Down Expand Up @@ -278,7 +279,7 @@ func TestBrokerConnector_Subscribe(t *testing.T) {

// check re-subscribe returns same sub
s2, _ := c.Subscribe("/topic/test")
assert.Equal(t, s.GetId().ID(), s2.GetId().ID())
assert.Equal(t, s.GetId().String(), s2.GetId().String())

c.Disconnect()
})
Expand Down
134 changes: 67 additions & 67 deletions bridge/example_connector_broker_tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,75 +4,75 @@
package bridge_test

import (
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
)

func Example_connectUsingBrokerViaTCP() {

// get a reference to the event bus.
b := bus.GetBus()

// create a broker connector configuration, using WebSockets.
// Make sure you have a STOMP TCP server running like RabbitMQ
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: ":61613",
STOMPHeader: map[string]string{
"access-token": "test",
},
}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
fmt.Printf("unable to connect, error: %e", err)
}
defer c.Disconnect()

// subscribe to our demo simple-stream
s, _ := c.Subscribe("/queue/sample")

// set a counter
n := 0

// create a control chan
done := make(chan bool)

// listen for messages
var consumer = func() {
for {
// listen for incoming messages from subscription.
m := <-s.GetMsgChannel()
n++

// get byte array.
d := m.Payload.([]byte)

fmt.Printf("Message Received: %s\n", string(d))
// listen for 5 messages then stop.
if n >= 5 {
break
}
}
done <- true
}

// send messages
var producer = func() {
for i := 0; i < 5; i++ {
c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i)))
}
}

// listen for incoming messages on subscription for destination /queue/sample
go consumer()

// send some messages to the broker on destination /queue/sample
go producer()

// wait for messages to be processed.
<-done
// get a reference to the event bus.
b := bus.GetBus()

// create a broker connector configuration, using WebSockets.
// Make sure you have a STOMP TCP server running like RabbitMQ
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: ":61613",
STOMPHeader: map[string]string{
"access-token": "test",
},
}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
fmt.Printf("unable to connect, error: %e", err)
}
defer c.Disconnect()

// subscribe to our demo simple-stream
s, _ := c.Subscribe("/queue/sample")

// set a counter
n := 0

// create a control chan
done := make(chan bool)

// listen for messages
var consumer = func() {
for {
// listen for incoming messages from subscription.
m := <-s.GetMsgChannel()
n++

// get byte array.
d := m.Payload.([]byte)

fmt.Printf("Message Received: %s\n", string(d))
// listen for 5 messages then stop.
if n >= 5 {
break
}
}
done <- true
}

// send messages
var producer = func() {
for i := 0; i < 5; i++ {
c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i)))
}
}

// listen for incoming messages on subscription for destination /queue/sample
go consumer()

// send some messages to the broker on destination /queue/sample
go producer()

// wait for messages to be processed.
<-done
}
124 changes: 62 additions & 62 deletions bridge/example_connector_broker_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,70 +4,70 @@
package bridge_test

import (
"encoding/json"
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
"encoding/json"
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
)

func Example_connectUsingBrokerViaWebSocket() {

// get a reference to the event bus.
b := bus.GetBus()

// create a broker connector configuration, using WebSockets.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"},
UseWS: true,
STOMPHeader: map[string]string{
"access-token": "test",
},
}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
fmt.Printf("unable to connect, error: %e", err)
}

// subscribe to our demo simple-stream
s, _ := c.Subscribe("/topic/simple-stream")

// set a counter
n := 0

// create a control chan
done := make(chan bool)

var listener = func() {
for {
// listen for incoming messages from subscription.
m := <-s.GetMsgChannel()

// unmarshal message.
r := &model.Response{}
d := m.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Message Received: %s\n", r.Payload.(string))

n++

// listen for 5 messages then stop.
if n >= 5 {
break
}
}
done <- true
}

// listen for incoming messages on subscription.
go listener()

<-done

c.Disconnect()
// get a reference to the event bus.
b := bus.GetBus()

// create a broker connector configuration, using WebSockets.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen a couple of these old references in here still. can we get rid of them?

WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"},
UseWS: true,
STOMPHeader: map[string]string{
"access-token": "test",
},
}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
fmt.Printf("unable to connect, error: %e", err)
}

// subscribe to our demo simple-stream
s, _ := c.Subscribe("/topic/simple-stream")

// set a counter
n := 0

// create a control chan
done := make(chan bool)

var listener = func() {
for {
// listen for incoming messages from subscription.
m := <-s.GetMsgChannel()

// unmarshal message.
r := &model.Response{}
d := m.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Message Received: %s\n", r.Payload.(string))

n++

// listen for 5 messages then stop.
if n >= 5 {
break
}
}
done <- true
}

// listen for incoming messages on subscription.
go listener()

<-done

c.Disconnect()
}
Loading