From bb22fa4f8d3a3e794dff0262bbd7afbecc31af6d Mon Sep 17 00:00:00 2001 From: Josh Kim Date: Tue, 3 Aug 2021 11:31:34 -0700 Subject: [PATCH] Implement missing broker connection details (#13) * Implement missing broker connection details Currently both STOMP connector and WS adapter are lacking an ability to customize the CONNECT frame which is crucial in connecting to heavily secured brokers. Also this PR allows passing additional HTTP request headers for the Web Socket Upgrade request. Signed-off-by: Josh Kim * Create SendJSONMessage fn for Connection interface Signed-off-by: Josh Kim * Make send JSON available on galacticStoreConnection Signed-off-by: Josh Kim * Add proper TLS support for WebSocket connection Signed-off-by: Josh Kim * Fix broken tests Signed-off-by: Josh Kim * Write unit tests for TLS functionality Signed-off-by: Josh Kim * Comment struct properties of BrokerConnectorConfig Signed-off-by: Josh Kim --- bridge/bridge_client.go | 39 ++++- bridge/broker_connector.go | 120 ++++++++------- bridge/broker_connector_config.go | 47 +++++- bridge/broker_connector_test.go | 57 +++++-- bridge/broker_connector_tls_test.go | 160 ++++++++++++++++++++ bridge/connection.go | 15 +- bridge/example_connector_broker_tcp_test.go | 8 +- bridge/example_connector_broker_ws_test.go | 8 +- bridge/test_server.crt | 21 +++ bridge/test_server.key | 28 ++++ bus/channel_test.go | 10 +- bus/eventbus_test.go | 4 +- bus/example_galactic_channels_test.go | 4 +- bus/store.go | 2 +- bus/store_manager.go | 4 +- bus/store_manager_test.go | 4 +- bus/store_test.go | 18 ++- go.sum | 9 +- transport.go | 28 +++- 19 files changed, 474 insertions(+), 112 deletions(-) create mode 100644 bridge/broker_connector_tls_test.go create mode 100644 bridge/test_server.crt create mode 100644 bridge/test_server.key diff --git a/bridge/bridge_client.go b/bridge/bridge_client.go index d03bcb7..55a300d 100644 --- a/bridge/bridge_client.go +++ b/bridge/bridge_client.go @@ -14,7 +14,6 @@ import ( "github.com/gorilla/websocket" "github.com/vmware/transport-go/model" "log" - "net/http" "net/url" "os" "strconv" @@ -61,14 +60,25 @@ func newBridgeWsClient(enableLogging bool) *BridgeClient { } // Connect to broker endpoint. -func (ws *BridgeClient) Connect(url *url.URL, headers http.Header) error { +func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error { ws.lock.Lock() defer ws.lock.Unlock() if ws.logger != nil { ws.logger.Printf("connecting to fabric endpoint over %s", url.String()) } - c, _, err := websocket.DefaultDialer.Dial(url.String(), headers) + // set TLS config if broker connector config demands that TLS be used + dialer := websocket.DefaultDialer + if config.WebSocketConfig.UseTLS { + if err := config.WebSocketConfig.LoadX509KeyPairFromFiles( + config.WebSocketConfig.CertFile, + config.WebSocketConfig.KeyFile); err != nil { + return err + } + dialer.TLSClientConfig = config.WebSocketConfig.TLSConfig + } + + c, _, err := dialer.Dial(url.String(), config.HttpHeader) if err != nil { return err } @@ -80,8 +90,21 @@ func (ws *BridgeClient) Connect(url *url.URL, headers http.Header) error { // go listen to the websocket go ws.listenSocket() + stompHeaders := []string{ + frame.AcceptVersion, + string(stomp.V12), + frame.Login, + config.Username, + frame.Passcode, + config.Password, + frame.HeartBeat, + fmt.Sprintf("%d,%d", config.HeartBeatOut.Milliseconds(), config.HeartBeatIn.Milliseconds())} + for key, value := range config.STOMPHeader { + stompHeaders = append(stompHeaders, key, value) + } + // send connect frame. - ws.SendFrame(frame.New(frame.CONNECT, frame.AcceptVersion, string(stomp.V12))) + ws.SendFrame(frame.New(frame.CONNECT, stompHeaders...)) // wait to be connected <-ws.ConnectedChan @@ -125,7 +148,7 @@ func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub { } // send a payload to a destination -func (ws *BridgeClient) Send(destination string, payload []byte) { +func (ws *BridgeClient) Send(destination, contentType string, payload []byte, opts ...func(fr *frame.Frame) error) { ws.lock.Lock() defer ws.lock.Unlock() @@ -133,8 +156,12 @@ func (ws *BridgeClient) Send(destination string, payload []byte) { sendFrame := frame.New(frame.SEND, frame.Destination, destination, frame.ContentLength, strconv.Itoa(len(payload)), - frame.ContentType, "application/json") + frame.ContentType, contentType) + // apply extra frame options such as adding extra headers + for _, frameOpt := range opts { + _ = frameOpt(sendFrame) + } // add payload sendFrame.Body = payload diff --git a/bridge/broker_connector.go b/bridge/broker_connector.go index 16d0e34..6aa2028 100644 --- a/bridge/broker_connector.go +++ b/bridge/broker_connector.go @@ -13,85 +13,97 @@ import ( // BrokerConnector is used to connect to a message broker over TCP or WebSocket. type BrokerConnector interface { - Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) + Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) } type brokerConnector struct { - c Connection - config *BrokerConnectorConfig - connected bool + c Connection + config *BrokerConnectorConfig + connected bool } // Create a new broker connector func NewBrokerConnector() BrokerConnector { - return &brokerConnector{connected: false} + return &brokerConnector{connected: false} } func checkConfig(config *BrokerConnectorConfig) error { - if config == nil { - return fmt.Errorf("config is nil") - } - if config.ServerAddr == "" { - return fmt.Errorf("config invalid, config missing server address") - } - if config.Username == "" { - return fmt.Errorf("config invalid, config missing username") - } - if config.Password == "" { - return fmt.Errorf("config invalid, config missing password") - } - return nil + if config == nil { + return fmt.Errorf("config is nil") + } + if config.ServerAddr == "" { + return fmt.Errorf("config invalid, config missing server address") + } + if config.Username == "" { + return fmt.Errorf("config invalid, config missing username") + } + if config.Password == "" { + return fmt.Errorf("config invalid, config missing password") + } + return nil } // Connect to broker using supplied connector config. func (bc *brokerConnector) Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) { - err := checkConfig(config) - if err != nil { - return nil, err - } + err := checkConfig(config) + if err != nil { + return nil, err + } - // use different mechanism for WS connections. - if config.UseWS { - return bc.connectWs(config, enableLogging) - } + // use different mechanism for WS connections. + if config.UseWS { + return bc.connectWs(config, enableLogging) + } - return bc.connectTCP(config, err) + return bc.connectTCP(config, err) } func (bc *brokerConnector) connectTCP(config *BrokerConnectorConfig, err error) (Connection, error) { - if config.HostHeader == "" { - config.HostHeader = "/" - } - var options = []func(*stomp.Conn) error{ - stomp.ConnOpt.Login(config.Username, config.Password), - stomp.ConnOpt.Host(config.HostHeader), - } - conn, err := stomp.Dial("tcp", config.ServerAddr, options...) - if err != nil { - return nil, err - } - id := uuid.New() - bcConn := &connection{ - id: &id, - conn: conn, - subscriptions: make(map[string]Subscription), - useWs: false, - connLock: sync.Mutex{}, - disconnectChan: make(chan bool)} - bc.c = bcConn - bc.connected = true - bc.config = config - return bcConn, nil + if config.HostHeader == "" { + config.HostHeader = "/" + } + var options = []func(*stomp.Conn) error{ + stomp.ConnOpt.Login(config.Username, config.Password), + stomp.ConnOpt.Host(config.HostHeader), + stomp.ConnOpt.HeartBeat(config.HeartBeatOut, config.HeartBeatIn), + } + + if config.STOMPHeader != nil { + for key, value := range config.STOMPHeader { + options = append(options, stomp.ConnOpt.Header(key, value)) + } + } + + conn, err := stomp.Dial("tcp", config.ServerAddr, options...) + if err != nil { + return nil, err + } + id := uuid.New() + bcConn := &connection{ + id: &id, + conn: conn, + subscriptions: make(map[string]Subscription), + useWs: false, + connLock: sync.Mutex{}, + disconnectChan: make(chan bool)} + bc.c = bcConn + bc.connected = true + bc.config = config + return bcConn, nil } func (bc *brokerConnector) connectWs(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) { + wsScheme := "ws" + if config.WebSocketConfig.UseTLS { + wsScheme += "s" + } - u := url.URL{Scheme: "ws", Host: config.ServerAddr, Path: config.WSPath} + u := url.URL{Scheme: wsScheme, Host: config.ServerAddr, Path: config.WebSocketConfig.WSPath} c := NewBridgeWsClient(enableLogging) - err := c.Connect(&u, nil) + err := c.Connect(&u, config) if err != nil { - return nil, fmt.Errorf("cannot connect to host '%s' via path '%s', stopping", config.ServerAddr, config.WSPath) + return nil, fmt.Errorf("cannot connect to host '%s' via path '%s', stopping", config.ServerAddr, config.WebSocketConfig.WSPath) } id := uuid.New() bcConn := &connection{ @@ -104,4 +116,4 @@ func (bc *brokerConnector) connectWs(config *BrokerConnectorConfig, enableLoggin bc.c = bcConn bc.connected = true return bcConn, nil -} \ No newline at end of file +} diff --git a/bridge/broker_connector_config.go b/bridge/broker_connector_config.go index 6ae0ebc..02fca78 100644 --- a/bridge/broker_connector_config.go +++ b/bridge/broker_connector_config.go @@ -3,12 +3,47 @@ package bridge +import ( + "crypto/tls" + "net/http" + "time" +) + +type WebSocketConfig struct { + WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric') + UseTLS bool // use TLS encryption with WebSocket connection + TLSConfig *tls.Config // TLS config for WebSocket connection + CertFile string // X509 certificate for TLS + KeyFile string // matching key file for the X509 certificate +} + // BrokerConnectorConfig is a configuration used when connecting to a message broker type BrokerConnectorConfig struct { - Username string - Password string - ServerAddr string - WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric') - UseWS bool // use WebSocket instead of TCP - HostHeader string + Username string + Password string + ServerAddr string + UseWS bool // use WebSocket instead of TCP + WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true + HostHeader string + HeartBeatOut time.Duration // outbound heartbeat interval (from client to server) + HeartBeatIn time.Duration // inbound heartbeat interval (from server to client) + STOMPHeader map[string]string // additional STOMP headers for handshake + HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade } + +// LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes +// the Certificates field of the TLS config instance with their contents, only if both Certificates is +// an empty slice and GetCertificate is nil +func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) error { + var err error + config := b.TLSConfig + configHasCert := len(config.Certificates) > 0 || config.GetCertificate != nil + if !configHasCert || certFile != "" || keyFile != "" { + config.Certificates = make([]tls.Certificate, 1) + config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return err + } + } + return err +} \ No newline at end of file diff --git a/bridge/broker_connector_test.go b/bridge/broker_connector_test.go index 10dc6d1..0826472 100644 --- a/bridge/broker_connector_test.go +++ b/bridge/broker_connector_test.go @@ -17,6 +17,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" ) var upgrader = websocket.Upgrader{} @@ -145,11 +146,31 @@ func TestBrokerConnector_ConnectBroker(t *testing.T) { { "Connect via websocket", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost}}, + Username: "guest", + Password: "guest", + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + UseWS: true, + ServerAddr: testHost, + HttpHeader: map[string][]string{ + "Sec-Websocket-Protocol": {"v12.stomp, access-token.something"}, + }, + STOMPHeader: map[string]string{ + "access-token": "token", + }, + }, + }, { "Connect via TCP", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", ServerAddr: testBrokerAddress}}, + Username: "guest", + Password: "guest", + ServerAddr: testBrokerAddress, + HeartBeatOut: 30*time.Second, + HeartBeatIn: 30*time.Second, + STOMPHeader: map[string]string{ + "access-token": "token", + }, + }}, } for _, tc := range tt { @@ -194,7 +215,11 @@ func TestBrokerConnector_ConnectBrokerFail(t *testing.T) { { "Connect via websocket fails with bad address", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: "nowhere"}}, + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: "nowhere"}}, { "Connect via TCP fails with bad address", &BrokerConnectorConfig{ @@ -223,7 +248,11 @@ func TestBrokerConnector_Subscribe(t *testing.T) { { "Subscribe via websocket", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost}}, + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: testHost}}, { "Subscribe via TCP", &BrokerConnectorConfig{ @@ -239,7 +268,7 @@ func TestBrokerConnector_Subscribe(t *testing.T) { s, _ := c.Subscribe("/topic/test") if !tc.config.UseWS { var ping = func() { - c.SendMessage("/topic/test", []byte(`happy baby melody!`)) + c.SendMessage("/topic/test", "text/plain", []byte(`happy baby melody!`)) } go ping() } @@ -292,13 +321,17 @@ func TestBrokerConnector_SendMessageOnWs(t *testing.T) { testHost := host + ":" + port cf := &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost} + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: testHost} bc := NewBrokerConnector() c, _ := bc.Connect(cf, true) assert.NotNil(t, c) - e := c.SendMessage("nowhere", []byte("out-there")) + e := c.SendMessage("nowhere", "text/plain", []byte("out-there")) assert.Nil(t, e) // try and send a message on a closed connection @@ -311,7 +344,7 @@ func TestBrokerConnector_SendMessageOnWs(t *testing.T) { c.Disconnect() - e = c.SendMessage("nowhere", []byte("out-there")) + e = c.SendMessage("nowhere", "text/plain", []byte("out-there")) assert.NotNil(t, e) } @@ -328,7 +361,11 @@ func TestBrokerConnector_Unsubscribe(t *testing.T) { { "Unsubscribe via websocket", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost}}, + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: testHost}}, { "Unsubscribe via TCP", &BrokerConnectorConfig{ @@ -344,7 +381,7 @@ func TestBrokerConnector_Unsubscribe(t *testing.T) { s, _ := c.Subscribe("/topic/test") if !tc.config.UseWS { var ping = func() { - c.SendMessage("/topic/test", []byte(`my little song`)) + c.SendMessage("/topic/test", "text/plain", []byte(`my little song`)) } go ping() } diff --git a/bridge/broker_connector_tls_test.go b/bridge/broker_connector_tls_test.go new file mode 100644 index 0000000..2b8d376 --- /dev/null +++ b/bridge/broker_connector_tls_test.go @@ -0,0 +1,160 @@ +// Copyright 2019-2020 VMware, Inc. +// SPDX-License-Identifier: BSD-2-Clause + +package bridge + +import ( + "crypto/tls" + "fmt" + "github.com/go-stomp/stomp/frame" + "github.com/go-stomp/stomp/server" + "github.com/stretchr/testify/assert" + "log" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +var webSocketURLChanTLS = make(chan string) +var websocketURLTLS string + +//var srv Server +var testTLS = &tls.Config{ + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS12, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_RSA_WITH_AES_256_CBC_SHA, + }, +} + +func runWebSocketEndPointTLS() { + s := httptest.NewUnstartedServer(http.HandlerFunc(websocketHandler)) + s.TLS = testTLS + s.StartTLS() + log.Println("WebSocket listening on", s.Listener.Addr().Network(), s.Listener.Addr().String(), "(TLS)") + httpServer = s + webSocketURLChanTLS <- s.URL +} + +func runStompBrokerTLS() { + l, err := net.Listen("tcp", ":51582") + if err != nil { + log.Fatalf("failed to listen: %s", err.Error()) + } + defer func() { l.Close() }() + + log.Println("TCP listening on", l.Addr().Network(), l.Addr().String(), "(TLS)") + server.Serve(l) + tcpServer = l +} + +func init() { + go runStompBrokerTLS() + go runWebSocketEndPointTLS() + + websocketURLTLS = <-webSocketURLChanTLS +} + +func TestBrokerConnector_ConnectBroker_Invalid_TLS_Cert(t *testing.T) { + url, _ := url.Parse(websocketURLTLS) + host, port, _ := net.SplitHostPort(url.Host) + testHost := host + ":" + port + + brokerConfig := &BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{ + WSPath: "/fabric", + UseTLS: true, + TLSConfig: testTLS, + CertFile: "nothing", + KeyFile: "nothing", + }, + ServerAddr: testHost, + } + bc := NewBrokerConnector() + _, err := bc.Connect(brokerConfig, true) + + assert.NotNil(t, err) +} + +func TestBrokerConnector_ConnectBroker_TLS(t *testing.T) { + url, _ := url.Parse(websocketURLTLS) + host, port, _ := net.SplitHostPort(url.Host) + testHost := host + ":" + port + + tt := []struct { + test string + config *BrokerConnectorConfig + }{ + { + "Connect via websocket with TLS", + &BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + WebSocketConfig: &WebSocketConfig{ + WSPath: "/", + UseTLS: true, + CertFile: "test_server.crt", + KeyFile: "test_server.key", + TLSConfig: testTLS, + }, + UseWS: true, + STOMPHeader: map[string]string{ + "access-token": "test", + }, + ServerAddr: testHost}, + }, + } + + for _, tc := range tt { + t.Run(tc.test, func(t *testing.T) { + + // connect + bc := NewBrokerConnector() + c, err := bc.Connect(tc.config, true) + + if err != nil { + fmt.Printf("unable to connect, error: %e", err) + } + + assert.NotNil(t, c) + assert.Nil(t, err) + if tc.config.UseWS { + assert.NotNil(t, c.(*connection).wsConn) + } + if !tc.config.UseWS { + assert.NotNil(t, c.(*connection).conn) + } + + m, _ := c.Subscribe("/topic/test-topic") + go func() { + err = c.SendJSONMessage("/topic/test-topic", []byte("{}"), func(frame *frame.Frame) error { + frame.Header.Set("access-token", "test") + return nil + }) + assert.Nil(t, err) + }() + msg := <-m.GetMsgChannel() + b := msg.Payload.([]byte) + assert.EqualValues(t, "happy baby melody!", string(b)) + + + // disconnect + err = c.Disconnect() + assert.Nil(t, err) + if tc.config.UseWS { + assert.Nil(t, c.(*connection).wsConn) + } + if !tc.config.UseWS { + assert.Nil(t, c.(*connection).conn) + } + }) + } +} diff --git a/bridge/connection.go b/bridge/connection.go index 9d24d46..026b80a 100644 --- a/bridge/connection.go +++ b/bridge/connection.go @@ -6,6 +6,7 @@ package bridge import ( "fmt" "github.com/go-stomp/stomp" + "github.com/go-stomp/stomp/frame" "github.com/google/uuid" "github.com/vmware/transport-go/model" "log" @@ -16,7 +17,8 @@ type Connection interface { GetId() *uuid.UUID Subscribe(destination string) (Subscription, error) Disconnect() (err error) - SendMessage(destination string, payload []byte) error + SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error + SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error } // Connection represents a Connection to a message broker. @@ -135,16 +137,21 @@ func (c *connection) listenTCPFrames(src chan *stomp.Message, dst chan *model.Me } } +// SendJSONMessage sends a []byte payload carrying JSON data to a destination. +func (c *connection) SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error { + return c.SendMessage(destination, "application/json", payload, opts...) +} + // Send a []byte payload to a destination. -func (c *connection) SendMessage(destination string, payload []byte) error { +func (c *connection) SendMessage(destination string, contentType string, payload []byte, opts ...func(*frame.Frame) error) error { c.connLock.Lock() defer c.connLock.Unlock() if c != nil && !c.useWs && c.conn != nil { - c.conn.Send(destination, "application/json", payload, nil) + c.conn.Send(destination, contentType, payload, opts...) return nil } if c != nil && c.useWs && c.wsConn != nil { - c.wsConn.Send(destination, payload) + c.wsConn.Send(destination, contentType, payload, opts...) return nil } return fmt.Errorf("cannot send message, no connection") diff --git a/bridge/example_connector_broker_tcp_test.go b/bridge/example_connector_broker_tcp_test.go index 0a4f04e..6eceb21 100644 --- a/bridge/example_connector_broker_tcp_test.go +++ b/bridge/example_connector_broker_tcp_test.go @@ -19,7 +19,11 @@ func Example_connectUsingBrokerViaTCP() { config := &bridge.BrokerConnectorConfig{ Username: "guest", Password: "guest", - ServerAddr: ":61613"} + ServerAddr: ":61613", + STOMPHeader: map[string]string{ + "access-token": "test", + }, + } // connect to broker. c, err := b.ConnectBroker(config) @@ -59,7 +63,7 @@ func Example_connectUsingBrokerViaTCP() { // send messages var producer = func() { for i := 0; i < 5; i++ { - c.SendMessage("/queue/sample", []byte(fmt.Sprintf("message: %d", i))) + c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i))) } } diff --git a/bridge/example_connector_broker_ws_test.go b/bridge/example_connector_broker_ws_test.go index cd2cf8c..a70a1d6 100644 --- a/bridge/example_connector_broker_ws_test.go +++ b/bridge/example_connector_broker_ws_test.go @@ -21,8 +21,12 @@ func Example_connectUsingBrokerViaWebSocket() { Username: "guest", Password: "guest", ServerAddr: "appfabric.vmware.com", - WSPath: "/fabric", - UseWS: true} + WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"}, + UseWS: true, + STOMPHeader: map[string]string{ + "access-token": "test", + }, + } // connect to broker. c, err := b.ConnectBroker(config) diff --git a/bridge/test_server.crt b/bridge/test_server.crt new file mode 100644 index 0000000..1dc6b14 --- /dev/null +++ b/bridge/test_server.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDczCCAlugAwIBAgIUb+B5lV/qlEARHHaypVf+SPo3BdEwDQYJKoZIhvcNAQEL +BQAwSTELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExETAPBgNVBAcM +CFNhbiBKb3NlMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwNzI3MDEwODQzWhcN +MjEwODI2MDEwODQzWjBJMQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2FsaWZvcm5p +YTERMA8GA1UEBwwIU2FuIEpvc2UxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBANTinzC+ed7UlaPnJq3vQdRpCxCzuDXN +8SbNEcZRpdWB6+LE1rii/oYHMtcpk3SZYO+290Sgx78G/qw8wZymXlBSGF2InNwk +zOf/0obd8d2+zqsl6fgPJ/FCFKGYdWKoAnVZUQsZu4J0TPes4vteiQ/epIdP+e0m +rzv6pXB1+s9HYVVxOmDzY+DHWH3szESRVfATfp9753G9z7YsBCkmE7TdwHSY63Xy +ZeARCVZiW/5CARGZGVFRTt8luPPibO8U5sBlcCy59BR8GbhYIFSb6NPcXuKIFC3p +TTKU+ywpExbdVWKpW08ts2CVSNC9zYJ+9cEWxZqVM2S1j6LMXrhuCLkCAwEAAaNT +MFEwHQYDVR0OBBYEFCvZ2XCn8UqbqYS7SCBWRbMbmFFwMB8GA1UdIwQYMBaAFCvZ +2XCn8UqbqYS7SCBWRbMbmFFwMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAHR72zbdLRSZ6g3vTAvNqmy5OEhlCfpuirRW6/Exdg0nPXCEMDsg/2GX +qqV1OovKathvXKpnc00HMAjxPCemrrry89zg2KdcozaoExjMpIf/iGeJE0HYB8VS +wzgh8a7vKZC0qZVk8QJeW8o4VEYxl8k8LMA5s51prvuQ6ZPqMDkpc0FREu9eCKLE +0mbzzacdJgbxT5xt0a671CMq2XS4QEstIVCIlAMHdtXoR1WdHJxsNyKYv7iFcgdd +6ekylrG/p5r+48GizjbJewqbOhneSbv/t+WdD423lSfBqDzCfKLdbzOzrvWkCNh/ +QOUA62+6Gs7KSAq8g8zju01C4pwhhQY= +-----END CERTIFICATE----- diff --git a/bridge/test_server.key b/bridge/test_server.key new file mode 100644 index 0000000..59f55d1 --- /dev/null +++ b/bridge/test_server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDU4p8wvnne1JWj +5yat70HUaQsQs7g1zfEmzRHGUaXVgevixNa4ov6GBzLXKZN0mWDvtvdEoMe/Bv6s +PMGcpl5QUhhdiJzcJMzn/9KG3fHdvs6rJen4DyfxQhShmHViqAJ1WVELGbuCdEz3 +rOL7XokP3qSHT/ntJq87+qVwdfrPR2FVcTpg82Pgx1h97MxEkVXwE36fe+dxvc+2 +LAQpJhO03cB0mOt18mXgEQlWYlv+QgERmRlRUU7fJbjz4mzvFObAZXAsufQUfBm4 +WCBUm+jT3F7iiBQt6U0ylPssKRMW3VViqVtPLbNglUjQvc2CfvXBFsWalTNktY+i +zF64bgi5AgMBAAECggEBAI/clbdbAQCUqIsaqgjgppvkYJXo3ZN+mHigHGLoyih+ +o+mbG3+qplXsh27fqcYh4GAeRPsbq+Br2a6LYEt8IkAlOlH7AAXE2LPvLVX2pnwz +dXYzKOl5VIS1IzwQy0LgRskGkr3tpcmCzoCRVNKKEBWpX4YRhdO9UK43s5VMJTHs +znq2EXK8fwmB7v+9l3EyHEME+QW4L/iGUwP+nEQQbBKJHBL9pBTScAAPdkZVSU84 +niLwX5qgJAN+l3/akC0UPHmyRrG4D2FhU5BrZ8DTP8d8X9EXEFanavzf6BJ7hzFS +Psis5xlRMX/q/k/AugzXSgC3liodeZRt36KpbGjHEAECgYEA+zspYNcyuI7zksnI +lAClgfKGDwfu8esd75OGI1IgEisi4VTI6O8chVNIOM8S7PcKqG6fTwIQrjJ+I2RW +rC4Vt4srMYdhX/w8P9CkRtor9G0YNFEjZvG/tTvKO4k0H9SrBQDr1I2YRTWay3Ja +6nlzkBWAb1R797nl4VBPbi+hFq0CgYEA2O0fFfkGTPMdzdUttEuigHJDGbQfJVY3 +2uWBb/36oSal4qlISu279AzJnD2f/Z1mzn0oY2buI22QfeS2ktY+NE57uOOA5VW/ +i49K1MiGJdGzqWFafrpcRCW2MC16uIxoatyttnFZzp+3K6RPl9vPtFRFjq4fr4pl +8kvG10Xp170CgYAMsEpAt9258I5gxYw53WZ+j68xpybsVQGoDf1iX017XW5fjxau +nRUFrtUg+WRtT3TcWOn25ZXOFoLEatbtz04NfRvrXnkOjgDFAFartW0u5SupyMdO +9brG/oQSSbW2Pdl9YTSIAbbFGBX/XIE3AndRuaRI+y1fr6XqoVHBsOoj6QKBgQCx +0ndoELIchDLLV1RW9qVoO8JLoL7jYXD6DKb1gjJgxgi37GLpoUYwwgbHADFVuiDb +ZfekvGy8OAV5XfJYHi3xvOBo6H3yAT+6jKUgFpz5BZUMZVTEi8o0xuE5Tx7Jh7fU +2b/Azdx7p6uEp/XjG3qBbY9zbcNlp8L4QyvlrlcJ1QKBgGpOPsSqGjUAIboQ5rFV +Mi8ZkAmYMg8xW4JdP1h/OpZcrwW8scOC3n8Xk9Ht0b0XnZeOHtmAJ4wRA8zgeUyw +JmZUkarLdnFD23jTFSNf7Xeht2skUQ2YHX2BYjROqnVdEe63ENLozKe7DG5xNR9N +T8MQTaL87ZjC06jHZxeJj5Af +-----END PRIVATE KEY----- diff --git a/bus/channel_test.go b/bus/channel_test.go index 3f9a79c..4125f63 100644 --- a/bus/channel_test.go +++ b/bus/channel_test.go @@ -4,6 +4,7 @@ package bus import ( + "github.com/go-stomp/stomp/frame" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -263,8 +264,13 @@ func (c *MockBridgeConnection) Disconnect() (err error) { return nil } -func (c *MockBridgeConnection) SendMessage(destination string, payload []byte) error { - args := c.MethodCalled("SendMessage", destination, payload) +func (c *MockBridgeConnection) SendJSONMessage(destination string, payload []byte, opts ...func(frame *frame.Frame) error) error { + args := c.MethodCalled("SendJSONMessage", destination, payload) + return args.Error(0) +} + +func (c *MockBridgeConnection) SendMessage(destination, contentType string, payload []byte, opts ...func(frame *frame.Frame) error) error { + args := c.MethodCalled("SendMessage", destination, contentType, payload) return args.Error(0) } diff --git a/bus/eventbus_test.go b/bus/eventbus_test.go index fdd1a84..e8a023a 100644 --- a/bus/eventbus_test.go +++ b/bus/eventbus_test.go @@ -599,7 +599,9 @@ func TestChannelManager_TestConnectBroker(t *testing.T) { Username: "test", Password: "test", UseWS: true, - WSPath: "/", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/", + }, ServerAddr: "broker-url"} id := uuid.New() diff --git a/bus/example_galactic_channels_test.go b/bus/example_galactic_channels_test.go index 2fbd36c..99cea86 100644 --- a/bus/example_galactic_channels_test.go +++ b/bus/example_galactic_channels_test.go @@ -57,7 +57,9 @@ func Example_usingGalacticChannels() { Username: "guest", Password: "guest", ServerAddr: "appfabric.vmware.com", - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + }, UseWS: true} // connect to broker. diff --git a/bus/store.go b/bus/store.go index af76ba2..74b26c7 100644 --- a/bus/store.go +++ b/bus/store.go @@ -227,7 +227,7 @@ func (store *busStore) sendGalacticRequest(requestCmd string, requestPayload int syncChannelConfig := store.galacticConf.syncChannelConfig // send request. - syncChannelConfig.conn.SendMessage( + syncChannelConfig.conn.SendJSONMessage( syncChannelConfig.pubPrefix + syncChannelConfig.syncChannelName, jsonReq) } diff --git a/bus/store_manager.go b/bus/store_manager.go index 2d871dd..d46d0d9 100644 --- a/bus/store_manager.go +++ b/bus/store_manager.go @@ -5,6 +5,7 @@ package bus import ( "fmt" + "github.com/go-stomp/stomp/frame" "github.com/google/uuid" "github.com/vmware/transport-go/bridge" "reflect" @@ -36,7 +37,8 @@ type StoreManager interface { // Interface which is a subset of the bridge.Connection methods. // Used to mock connection objects during unit testing. type galacticStoreConnection interface { - SendMessage(destination string, payload []byte) error + SendJSONMessage(destination string, payload []byte, opts ...func(frame *frame.Frame) error) error + SendMessage(destination, contentType string, payload []byte, opts ...func(frame *frame.Frame) error) error } type storeSyncChannelConfig struct { diff --git a/bus/store_manager_test.go b/bus/store_manager_test.go index 6117ad1..323b823 100644 --- a/bus/store_manager_test.go +++ b/bus/store_manager_test.go @@ -122,7 +122,8 @@ func TestStoreManager_OpenGalacticStore(t *testing.T) { Id: &subId, } con.On("Subscribe", mock.Anything).Return(sub, nil) - con.On("SendMessage", mock.Anything, mock.Anything).Return(nil) + con.On("SendJSONMessage", mock.Anything, mock.Anything).Return(nil) + con.On("SendMessage", mock.Anything, mock.Anything, mock.Anything).Return(nil) m.ConfigureStoreSyncChannel(con, "/topic-prefix", "/pub-prefix") storeManagerImpl := m.(*storeManager) @@ -174,6 +175,7 @@ func TestStoreManager_OpenGalacticStoreWithType(t *testing.T) { Id: &subId, } con.On("Subscribe", mock.Anything).Return(sub, nil) + con.On("SendJSONMessage", mock.Anything, mock.Anything).Return(nil) con.On("SendMessage", mock.Anything, mock.Anything).Return(nil) m.ConfigureStoreSyncChannel(con, "/topic-prefix", "/pub-prefix") diff --git a/bus/store_test.go b/bus/store_test.go index edcb1bf..2ca9631 100644 --- a/bus/store_test.go +++ b/bus/store_test.go @@ -4,13 +4,14 @@ package bus import ( - "testing" + "encoding/json" + "fmt" + "github.com/go-stomp/stomp/frame" "github.com/stretchr/testify/assert" + "reflect" "sync" "sync/atomic" - "fmt" - "reflect" - "encoding/json" + "testing" ) type testItem struct { @@ -27,13 +28,19 @@ func testStore() BusStore { type mockGalacticStoreConnection struct { messages []map[string]interface{} topics []string + opts []func(fr *frame.Frame) error +} + +func (con *mockGalacticStoreConnection) SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error { + return con.SendMessage(destination, "application/json", payload, opts...) } -func (con *mockGalacticStoreConnection) SendMessage(destination string, payload []byte) error { +func (con *mockGalacticStoreConnection) SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error { var msgPayload map[string]interface{} json.Unmarshal(payload, &msgPayload) con.messages = append(con.messages, msgPayload) con.topics = append(con.topics, destination) + con.opts = opts return nil } @@ -55,6 +62,7 @@ func testGalacticStore(itemType reflect.Type) (BusStore, *mockGalacticStoreConne conn := &mockGalacticStoreConnection{ messages: make([]map[string]interface{}, 0), topics: make([]string,0), + opts: make([]func(*frame.Frame) error, 0), } conf := &galacticStoreConfig{ diff --git a/go.sum b/go.sum index 1dade13..3d0e39e 100644 --- a/go.sum +++ b/go.sum @@ -13,13 +13,8 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= -github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10= -github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -34,12 +29,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191008105621-543471e840be h1:QAcqgptGM8IQBC9K/RC4o+O9YmqEm0diQn9QmZw/0mU= -golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/transport.go b/transport.go index 2d3d233..e92561f 100644 --- a/transport.go +++ b/transport.go @@ -157,7 +157,10 @@ func runDemoCal() { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: "appfabric.vmware.com"} // connect to broker. @@ -182,7 +185,7 @@ func runDemoCal() { fmt.Println("Requesting time from calendar service") // send request. - c.SendMessage("/pub/"+channel, m) + c.SendJSONMessage("/pub/"+channel, m) // wait for done signal <-done @@ -249,7 +252,10 @@ func runDemoVmService(ctx *cli.Context) { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: addr} // connect to broker. @@ -272,7 +278,7 @@ func runDemoVmService(ctx *cli.Context) { Payload: payload, } m, _ := json.Marshal(r) - c.SendMessage("/pub/"+channel, m) + c.SendJSONMessage("/pub/"+channel, m) // wait for done signal <-done @@ -344,7 +350,10 @@ func runDemoStore(ctx *cli.Context) { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: addr} // connect to broker. @@ -492,7 +501,10 @@ func runDemoApp(ctx *cli.Context) { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: addr} } @@ -516,7 +528,7 @@ func runDemoApp(ctx *cli.Context) { pl := "ping--" + strconv.Itoa(rand.Intn(10000000)) r := &model.Request{Request: "basic", Payload: pl} m, _ := json.Marshal(r) - c.SendMessage("/pub/"+PongServiceChan, m) + c.SendJSONMessage("/pub/"+PongServiceChan, m) time.Sleep(500 * time.Millisecond) } @@ -570,7 +582,7 @@ func runDemoApp(ctx *cli.Context) { pl := "ping--" + strconv.Itoa(rand.Intn(10000000)) r := &model.Request{Request: "full", Payload: pl} m, _ := json.Marshal(r) - c.SendMessage("/pub/queue/"+PongServiceChan, m) + c.SendJSONMessage("/pub/queue/"+PongServiceChan, m) time.Sleep(500 * time.Millisecond) }