Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
Implement missing broker connection details (#13)
Browse files Browse the repository at this point in the history
* Implement missing broker connection details

Currently both STOMP connector and WS adapter are lacking an ability
to customize the CONNECT frame which is crucial in connecting to
heavily secured brokers. Also this PR allows passing additional HTTP
request headers for the Web Socket Upgrade request.

Signed-off-by: Josh Kim <kjosh@vmware.com>

* Create SendJSONMessage fn for Connection interface

Signed-off-by: Josh Kim <kjosh@vmware.com>

* Make send JSON available on galacticStoreConnection

Signed-off-by: Josh Kim <kjosh@vmware.com>

* Add proper TLS support for WebSocket connection

Signed-off-by: Josh Kim <kjosh@vmware.com>

* Fix broken tests

Signed-off-by: Josh Kim <kjosh@vmware.com>

* Write unit tests for TLS functionality

Signed-off-by: Josh Kim <kjosh@vmware.com>

* Comment struct properties of BrokerConnectorConfig

Signed-off-by: Josh Kim <kjosh@vmware.com>
  • Loading branch information
jooskim authored Aug 3, 2021
1 parent 782054e commit bb22fa4
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 112 deletions.
39 changes: 33 additions & 6 deletions bridge/bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/gorilla/websocket"
"github.com/vmware/transport-go/model"
"log"
"net/http"
"net/url"
"os"
"strconv"
Expand Down Expand Up @@ -61,14 +60,25 @@ func newBridgeWsClient(enableLogging bool) *BridgeClient {
}

// Connect to broker endpoint.
func (ws *BridgeClient) Connect(url *url.URL, headers http.Header) error {
func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error {
ws.lock.Lock()
defer ws.lock.Unlock()
if ws.logger != nil {
ws.logger.Printf("connecting to fabric endpoint over %s", url.String())
}

c, _, err := websocket.DefaultDialer.Dial(url.String(), headers)
// set TLS config if broker connector config demands that TLS be used
dialer := websocket.DefaultDialer
if config.WebSocketConfig.UseTLS {
if err := config.WebSocketConfig.LoadX509KeyPairFromFiles(
config.WebSocketConfig.CertFile,
config.WebSocketConfig.KeyFile); err != nil {
return err
}
dialer.TLSClientConfig = config.WebSocketConfig.TLSConfig
}

c, _, err := dialer.Dial(url.String(), config.HttpHeader)
if err != nil {
return err
}
Expand All @@ -80,8 +90,21 @@ func (ws *BridgeClient) Connect(url *url.URL, headers http.Header) error {
// go listen to the websocket
go ws.listenSocket()

stompHeaders := []string{
frame.AcceptVersion,
string(stomp.V12),
frame.Login,
config.Username,
frame.Passcode,
config.Password,
frame.HeartBeat,
fmt.Sprintf("%d,%d", config.HeartBeatOut.Milliseconds(), config.HeartBeatIn.Milliseconds())}
for key, value := range config.STOMPHeader {
stompHeaders = append(stompHeaders, key, value)
}

// send connect frame.
ws.SendFrame(frame.New(frame.CONNECT, frame.AcceptVersion, string(stomp.V12)))
ws.SendFrame(frame.New(frame.CONNECT, stompHeaders...))

// wait to be connected
<-ws.ConnectedChan
Expand Down Expand Up @@ -125,16 +148,20 @@ func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub {
}

// send a payload to a destination
func (ws *BridgeClient) Send(destination string, payload []byte) {
func (ws *BridgeClient) Send(destination, contentType string, payload []byte, opts ...func(fr *frame.Frame) error) {
ws.lock.Lock()
defer ws.lock.Unlock()

// create send frame.
sendFrame := frame.New(frame.SEND,
frame.Destination, destination,
frame.ContentLength, strconv.Itoa(len(payload)),
frame.ContentType, "application/json")
frame.ContentType, contentType)

// apply extra frame options such as adding extra headers
for _, frameOpt := range opts {
_ = frameOpt(sendFrame)
}
// add payload
sendFrame.Body = payload

Expand Down
120 changes: 66 additions & 54 deletions bridge/broker_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,85 +13,97 @@ import (

// BrokerConnector is used to connect to a message broker over TCP or WebSocket.
type BrokerConnector interface {
Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error)
Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error)
}

type brokerConnector struct {
c Connection
config *BrokerConnectorConfig
connected bool
c Connection
config *BrokerConnectorConfig
connected bool
}

// Create a new broker connector
func NewBrokerConnector() BrokerConnector {
return &brokerConnector{connected: false}
return &brokerConnector{connected: false}
}

func checkConfig(config *BrokerConnectorConfig) error {
if config == nil {
return fmt.Errorf("config is nil")
}
if config.ServerAddr == "" {
return fmt.Errorf("config invalid, config missing server address")
}
if config.Username == "" {
return fmt.Errorf("config invalid, config missing username")
}
if config.Password == "" {
return fmt.Errorf("config invalid, config missing password")
}
return nil
if config == nil {
return fmt.Errorf("config is nil")
}
if config.ServerAddr == "" {
return fmt.Errorf("config invalid, config missing server address")
}
if config.Username == "" {
return fmt.Errorf("config invalid, config missing username")
}
if config.Password == "" {
return fmt.Errorf("config invalid, config missing password")
}
return nil
}

// Connect to broker using supplied connector config.
func (bc *brokerConnector) Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) {

err := checkConfig(config)
if err != nil {
return nil, err
}
err := checkConfig(config)
if err != nil {
return nil, err
}

// use different mechanism for WS connections.
if config.UseWS {
return bc.connectWs(config, enableLogging)
}
// use different mechanism for WS connections.
if config.UseWS {
return bc.connectWs(config, enableLogging)
}

return bc.connectTCP(config, err)
return bc.connectTCP(config, err)
}

func (bc *brokerConnector) connectTCP(config *BrokerConnectorConfig, err error) (Connection, error) {
if config.HostHeader == "" {
config.HostHeader = "/"
}
var options = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(config.Username, config.Password),
stomp.ConnOpt.Host(config.HostHeader),
}
conn, err := stomp.Dial("tcp", config.ServerAddr, options...)
if err != nil {
return nil, err
}
id := uuid.New()
bcConn := &connection{
id: &id,
conn: conn,
subscriptions: make(map[string]Subscription),
useWs: false,
connLock: sync.Mutex{},
disconnectChan: make(chan bool)}
bc.c = bcConn
bc.connected = true
bc.config = config
return bcConn, nil
if config.HostHeader == "" {
config.HostHeader = "/"
}
var options = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(config.Username, config.Password),
stomp.ConnOpt.Host(config.HostHeader),
stomp.ConnOpt.HeartBeat(config.HeartBeatOut, config.HeartBeatIn),
}

if config.STOMPHeader != nil {
for key, value := range config.STOMPHeader {
options = append(options, stomp.ConnOpt.Header(key, value))
}
}

conn, err := stomp.Dial("tcp", config.ServerAddr, options...)
if err != nil {
return nil, err
}
id := uuid.New()
bcConn := &connection{
id: &id,
conn: conn,
subscriptions: make(map[string]Subscription),
useWs: false,
connLock: sync.Mutex{},
disconnectChan: make(chan bool)}
bc.c = bcConn
bc.connected = true
bc.config = config
return bcConn, nil
}

func (bc *brokerConnector) connectWs(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) {
wsScheme := "ws"
if config.WebSocketConfig.UseTLS {
wsScheme += "s"
}

u := url.URL{Scheme: "ws", Host: config.ServerAddr, Path: config.WSPath}
u := url.URL{Scheme: wsScheme, Host: config.ServerAddr, Path: config.WebSocketConfig.WSPath}
c := NewBridgeWsClient(enableLogging)
err := c.Connect(&u, nil)
err := c.Connect(&u, config)
if err != nil {
return nil, fmt.Errorf("cannot connect to host '%s' via path '%s', stopping", config.ServerAddr, config.WSPath)
return nil, fmt.Errorf("cannot connect to host '%s' via path '%s', stopping", config.ServerAddr, config.WebSocketConfig.WSPath)
}
id := uuid.New()
bcConn := &connection{
Expand All @@ -104,4 +116,4 @@ func (bc *brokerConnector) connectWs(config *BrokerConnectorConfig, enableLoggin
bc.c = bcConn
bc.connected = true
return bcConn, nil
}
}
47 changes: 41 additions & 6 deletions bridge/broker_connector_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,47 @@

package bridge

import (
"crypto/tls"
"net/http"
"time"
)

type WebSocketConfig struct {
WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric')
UseTLS bool // use TLS encryption with WebSocket connection
TLSConfig *tls.Config // TLS config for WebSocket connection
CertFile string // X509 certificate for TLS
KeyFile string // matching key file for the X509 certificate
}

// BrokerConnectorConfig is a configuration used when connecting to a message broker
type BrokerConnectorConfig struct {
Username string
Password string
ServerAddr string
WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric')
UseWS bool // use WebSocket instead of TCP
HostHeader string
Username string
Password string
ServerAddr string
UseWS bool // use WebSocket instead of TCP
WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true
HostHeader string
HeartBeatOut time.Duration // outbound heartbeat interval (from client to server)
HeartBeatIn time.Duration // inbound heartbeat interval (from server to client)
STOMPHeader map[string]string // additional STOMP headers for handshake
HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade
}

// LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes
// the Certificates field of the TLS config instance with their contents, only if both Certificates is
// an empty slice and GetCertificate is nil
func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) error {
var err error
config := b.TLSConfig
configHasCert := len(config.Certificates) > 0 || config.GetCertificate != nil
if !configHasCert || certFile != "" || keyFile != "" {
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return err
}
}
return err
}
Loading

0 comments on commit bb22fa4

Please sign in to comment.