diff --git a/bridge/bridge_client.go b/bridge/bridge_client.go index d03bcb7..55a300d 100644 --- a/bridge/bridge_client.go +++ b/bridge/bridge_client.go @@ -14,7 +14,6 @@ import ( "github.com/gorilla/websocket" "github.com/vmware/transport-go/model" "log" - "net/http" "net/url" "os" "strconv" @@ -61,14 +60,25 @@ func newBridgeWsClient(enableLogging bool) *BridgeClient { } // Connect to broker endpoint. -func (ws *BridgeClient) Connect(url *url.URL, headers http.Header) error { +func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error { ws.lock.Lock() defer ws.lock.Unlock() if ws.logger != nil { ws.logger.Printf("connecting to fabric endpoint over %s", url.String()) } - c, _, err := websocket.DefaultDialer.Dial(url.String(), headers) + // set TLS config if broker connector config demands that TLS be used + dialer := websocket.DefaultDialer + if config.WebSocketConfig.UseTLS { + if err := config.WebSocketConfig.LoadX509KeyPairFromFiles( + config.WebSocketConfig.CertFile, + config.WebSocketConfig.KeyFile); err != nil { + return err + } + dialer.TLSClientConfig = config.WebSocketConfig.TLSConfig + } + + c, _, err := dialer.Dial(url.String(), config.HttpHeader) if err != nil { return err } @@ -80,8 +90,21 @@ func (ws *BridgeClient) Connect(url *url.URL, headers http.Header) error { // go listen to the websocket go ws.listenSocket() + stompHeaders := []string{ + frame.AcceptVersion, + string(stomp.V12), + frame.Login, + config.Username, + frame.Passcode, + config.Password, + frame.HeartBeat, + fmt.Sprintf("%d,%d", config.HeartBeatOut.Milliseconds(), config.HeartBeatIn.Milliseconds())} + for key, value := range config.STOMPHeader { + stompHeaders = append(stompHeaders, key, value) + } + // send connect frame. - ws.SendFrame(frame.New(frame.CONNECT, frame.AcceptVersion, string(stomp.V12))) + ws.SendFrame(frame.New(frame.CONNECT, stompHeaders...)) // wait to be connected <-ws.ConnectedChan @@ -125,7 +148,7 @@ func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub { } // send a payload to a destination -func (ws *BridgeClient) Send(destination string, payload []byte) { +func (ws *BridgeClient) Send(destination, contentType string, payload []byte, opts ...func(fr *frame.Frame) error) { ws.lock.Lock() defer ws.lock.Unlock() @@ -133,8 +156,12 @@ func (ws *BridgeClient) Send(destination string, payload []byte) { sendFrame := frame.New(frame.SEND, frame.Destination, destination, frame.ContentLength, strconv.Itoa(len(payload)), - frame.ContentType, "application/json") + frame.ContentType, contentType) + // apply extra frame options such as adding extra headers + for _, frameOpt := range opts { + _ = frameOpt(sendFrame) + } // add payload sendFrame.Body = payload diff --git a/bridge/broker_connector.go b/bridge/broker_connector.go index 16d0e34..6aa2028 100644 --- a/bridge/broker_connector.go +++ b/bridge/broker_connector.go @@ -13,85 +13,97 @@ import ( // BrokerConnector is used to connect to a message broker over TCP or WebSocket. type BrokerConnector interface { - Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) + Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) } type brokerConnector struct { - c Connection - config *BrokerConnectorConfig - connected bool + c Connection + config *BrokerConnectorConfig + connected bool } // Create a new broker connector func NewBrokerConnector() BrokerConnector { - return &brokerConnector{connected: false} + return &brokerConnector{connected: false} } func checkConfig(config *BrokerConnectorConfig) error { - if config == nil { - return fmt.Errorf("config is nil") - } - if config.ServerAddr == "" { - return fmt.Errorf("config invalid, config missing server address") - } - if config.Username == "" { - return fmt.Errorf("config invalid, config missing username") - } - if config.Password == "" { - return fmt.Errorf("config invalid, config missing password") - } - return nil + if config == nil { + return fmt.Errorf("config is nil") + } + if config.ServerAddr == "" { + return fmt.Errorf("config invalid, config missing server address") + } + if config.Username == "" { + return fmt.Errorf("config invalid, config missing username") + } + if config.Password == "" { + return fmt.Errorf("config invalid, config missing password") + } + return nil } // Connect to broker using supplied connector config. func (bc *brokerConnector) Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) { - err := checkConfig(config) - if err != nil { - return nil, err - } + err := checkConfig(config) + if err != nil { + return nil, err + } - // use different mechanism for WS connections. - if config.UseWS { - return bc.connectWs(config, enableLogging) - } + // use different mechanism for WS connections. + if config.UseWS { + return bc.connectWs(config, enableLogging) + } - return bc.connectTCP(config, err) + return bc.connectTCP(config, err) } func (bc *brokerConnector) connectTCP(config *BrokerConnectorConfig, err error) (Connection, error) { - if config.HostHeader == "" { - config.HostHeader = "/" - } - var options = []func(*stomp.Conn) error{ - stomp.ConnOpt.Login(config.Username, config.Password), - stomp.ConnOpt.Host(config.HostHeader), - } - conn, err := stomp.Dial("tcp", config.ServerAddr, options...) - if err != nil { - return nil, err - } - id := uuid.New() - bcConn := &connection{ - id: &id, - conn: conn, - subscriptions: make(map[string]Subscription), - useWs: false, - connLock: sync.Mutex{}, - disconnectChan: make(chan bool)} - bc.c = bcConn - bc.connected = true - bc.config = config - return bcConn, nil + if config.HostHeader == "" { + config.HostHeader = "/" + } + var options = []func(*stomp.Conn) error{ + stomp.ConnOpt.Login(config.Username, config.Password), + stomp.ConnOpt.Host(config.HostHeader), + stomp.ConnOpt.HeartBeat(config.HeartBeatOut, config.HeartBeatIn), + } + + if config.STOMPHeader != nil { + for key, value := range config.STOMPHeader { + options = append(options, stomp.ConnOpt.Header(key, value)) + } + } + + conn, err := stomp.Dial("tcp", config.ServerAddr, options...) + if err != nil { + return nil, err + } + id := uuid.New() + bcConn := &connection{ + id: &id, + conn: conn, + subscriptions: make(map[string]Subscription), + useWs: false, + connLock: sync.Mutex{}, + disconnectChan: make(chan bool)} + bc.c = bcConn + bc.connected = true + bc.config = config + return bcConn, nil } func (bc *brokerConnector) connectWs(config *BrokerConnectorConfig, enableLogging bool) (Connection, error) { + wsScheme := "ws" + if config.WebSocketConfig.UseTLS { + wsScheme += "s" + } - u := url.URL{Scheme: "ws", Host: config.ServerAddr, Path: config.WSPath} + u := url.URL{Scheme: wsScheme, Host: config.ServerAddr, Path: config.WebSocketConfig.WSPath} c := NewBridgeWsClient(enableLogging) - err := c.Connect(&u, nil) + err := c.Connect(&u, config) if err != nil { - return nil, fmt.Errorf("cannot connect to host '%s' via path '%s', stopping", config.ServerAddr, config.WSPath) + return nil, fmt.Errorf("cannot connect to host '%s' via path '%s', stopping", config.ServerAddr, config.WebSocketConfig.WSPath) } id := uuid.New() bcConn := &connection{ @@ -104,4 +116,4 @@ func (bc *brokerConnector) connectWs(config *BrokerConnectorConfig, enableLoggin bc.c = bcConn bc.connected = true return bcConn, nil -} \ No newline at end of file +} diff --git a/bridge/broker_connector_config.go b/bridge/broker_connector_config.go index 6ae0ebc..02fca78 100644 --- a/bridge/broker_connector_config.go +++ b/bridge/broker_connector_config.go @@ -3,12 +3,47 @@ package bridge +import ( + "crypto/tls" + "net/http" + "time" +) + +type WebSocketConfig struct { + WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric') + UseTLS bool // use TLS encryption with WebSocket connection + TLSConfig *tls.Config // TLS config for WebSocket connection + CertFile string // X509 certificate for TLS + KeyFile string // matching key file for the X509 certificate +} + // BrokerConnectorConfig is a configuration used when connecting to a message broker type BrokerConnectorConfig struct { - Username string - Password string - ServerAddr string - WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric') - UseWS bool // use WebSocket instead of TCP - HostHeader string + Username string + Password string + ServerAddr string + UseWS bool // use WebSocket instead of TCP + WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true + HostHeader string + HeartBeatOut time.Duration // outbound heartbeat interval (from client to server) + HeartBeatIn time.Duration // inbound heartbeat interval (from server to client) + STOMPHeader map[string]string // additional STOMP headers for handshake + HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade } + +// LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes +// the Certificates field of the TLS config instance with their contents, only if both Certificates is +// an empty slice and GetCertificate is nil +func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) error { + var err error + config := b.TLSConfig + configHasCert := len(config.Certificates) > 0 || config.GetCertificate != nil + if !configHasCert || certFile != "" || keyFile != "" { + config.Certificates = make([]tls.Certificate, 1) + config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return err + } + } + return err +} \ No newline at end of file diff --git a/bridge/broker_connector_test.go b/bridge/broker_connector_test.go index 10dc6d1..0826472 100644 --- a/bridge/broker_connector_test.go +++ b/bridge/broker_connector_test.go @@ -17,6 +17,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" ) var upgrader = websocket.Upgrader{} @@ -145,11 +146,31 @@ func TestBrokerConnector_ConnectBroker(t *testing.T) { { "Connect via websocket", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost}}, + Username: "guest", + Password: "guest", + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + UseWS: true, + ServerAddr: testHost, + HttpHeader: map[string][]string{ + "Sec-Websocket-Protocol": {"v12.stomp, access-token.something"}, + }, + STOMPHeader: map[string]string{ + "access-token": "token", + }, + }, + }, { "Connect via TCP", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", ServerAddr: testBrokerAddress}}, + Username: "guest", + Password: "guest", + ServerAddr: testBrokerAddress, + HeartBeatOut: 30*time.Second, + HeartBeatIn: 30*time.Second, + STOMPHeader: map[string]string{ + "access-token": "token", + }, + }}, } for _, tc := range tt { @@ -194,7 +215,11 @@ func TestBrokerConnector_ConnectBrokerFail(t *testing.T) { { "Connect via websocket fails with bad address", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: "nowhere"}}, + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: "nowhere"}}, { "Connect via TCP fails with bad address", &BrokerConnectorConfig{ @@ -223,7 +248,11 @@ func TestBrokerConnector_Subscribe(t *testing.T) { { "Subscribe via websocket", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost}}, + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: testHost}}, { "Subscribe via TCP", &BrokerConnectorConfig{ @@ -239,7 +268,7 @@ func TestBrokerConnector_Subscribe(t *testing.T) { s, _ := c.Subscribe("/topic/test") if !tc.config.UseWS { var ping = func() { - c.SendMessage("/topic/test", []byte(`happy baby melody!`)) + c.SendMessage("/topic/test", "text/plain", []byte(`happy baby melody!`)) } go ping() } @@ -292,13 +321,17 @@ func TestBrokerConnector_SendMessageOnWs(t *testing.T) { testHost := host + ":" + port cf := &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost} + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: testHost} bc := NewBrokerConnector() c, _ := bc.Connect(cf, true) assert.NotNil(t, c) - e := c.SendMessage("nowhere", []byte("out-there")) + e := c.SendMessage("nowhere", "text/plain", []byte("out-there")) assert.Nil(t, e) // try and send a message on a closed connection @@ -311,7 +344,7 @@ func TestBrokerConnector_SendMessageOnWs(t *testing.T) { c.Disconnect() - e = c.SendMessage("nowhere", []byte("out-there")) + e = c.SendMessage("nowhere", "text/plain", []byte("out-there")) assert.NotNil(t, e) } @@ -328,7 +361,11 @@ func TestBrokerConnector_Unsubscribe(t *testing.T) { { "Unsubscribe via websocket", &BrokerConnectorConfig{ - Username: "guest", Password: "guest", UseWS: true, WSPath: "/", ServerAddr: testHost}}, + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{WSPath: "/"}, + ServerAddr: testHost}}, { "Unsubscribe via TCP", &BrokerConnectorConfig{ @@ -344,7 +381,7 @@ func TestBrokerConnector_Unsubscribe(t *testing.T) { s, _ := c.Subscribe("/topic/test") if !tc.config.UseWS { var ping = func() { - c.SendMessage("/topic/test", []byte(`my little song`)) + c.SendMessage("/topic/test", "text/plain", []byte(`my little song`)) } go ping() } diff --git a/bridge/broker_connector_tls_test.go b/bridge/broker_connector_tls_test.go new file mode 100644 index 0000000..2b8d376 --- /dev/null +++ b/bridge/broker_connector_tls_test.go @@ -0,0 +1,160 @@ +// Copyright 2019-2020 VMware, Inc. +// SPDX-License-Identifier: BSD-2-Clause + +package bridge + +import ( + "crypto/tls" + "fmt" + "github.com/go-stomp/stomp/frame" + "github.com/go-stomp/stomp/server" + "github.com/stretchr/testify/assert" + "log" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +var webSocketURLChanTLS = make(chan string) +var websocketURLTLS string + +//var srv Server +var testTLS = &tls.Config{ + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS12, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_RSA_WITH_AES_256_CBC_SHA, + }, +} + +func runWebSocketEndPointTLS() { + s := httptest.NewUnstartedServer(http.HandlerFunc(websocketHandler)) + s.TLS = testTLS + s.StartTLS() + log.Println("WebSocket listening on", s.Listener.Addr().Network(), s.Listener.Addr().String(), "(TLS)") + httpServer = s + webSocketURLChanTLS <- s.URL +} + +func runStompBrokerTLS() { + l, err := net.Listen("tcp", ":51582") + if err != nil { + log.Fatalf("failed to listen: %s", err.Error()) + } + defer func() { l.Close() }() + + log.Println("TCP listening on", l.Addr().Network(), l.Addr().String(), "(TLS)") + server.Serve(l) + tcpServer = l +} + +func init() { + go runStompBrokerTLS() + go runWebSocketEndPointTLS() + + websocketURLTLS = <-webSocketURLChanTLS +} + +func TestBrokerConnector_ConnectBroker_Invalid_TLS_Cert(t *testing.T) { + url, _ := url.Parse(websocketURLTLS) + host, port, _ := net.SplitHostPort(url.Host) + testHost := host + ":" + port + + brokerConfig := &BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + UseWS: true, + WebSocketConfig: &WebSocketConfig{ + WSPath: "/fabric", + UseTLS: true, + TLSConfig: testTLS, + CertFile: "nothing", + KeyFile: "nothing", + }, + ServerAddr: testHost, + } + bc := NewBrokerConnector() + _, err := bc.Connect(brokerConfig, true) + + assert.NotNil(t, err) +} + +func TestBrokerConnector_ConnectBroker_TLS(t *testing.T) { + url, _ := url.Parse(websocketURLTLS) + host, port, _ := net.SplitHostPort(url.Host) + testHost := host + ":" + port + + tt := []struct { + test string + config *BrokerConnectorConfig + }{ + { + "Connect via websocket with TLS", + &BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + WebSocketConfig: &WebSocketConfig{ + WSPath: "/", + UseTLS: true, + CertFile: "test_server.crt", + KeyFile: "test_server.key", + TLSConfig: testTLS, + }, + UseWS: true, + STOMPHeader: map[string]string{ + "access-token": "test", + }, + ServerAddr: testHost}, + }, + } + + for _, tc := range tt { + t.Run(tc.test, func(t *testing.T) { + + // connect + bc := NewBrokerConnector() + c, err := bc.Connect(tc.config, true) + + if err != nil { + fmt.Printf("unable to connect, error: %e", err) + } + + assert.NotNil(t, c) + assert.Nil(t, err) + if tc.config.UseWS { + assert.NotNil(t, c.(*connection).wsConn) + } + if !tc.config.UseWS { + assert.NotNil(t, c.(*connection).conn) + } + + m, _ := c.Subscribe("/topic/test-topic") + go func() { + err = c.SendJSONMessage("/topic/test-topic", []byte("{}"), func(frame *frame.Frame) error { + frame.Header.Set("access-token", "test") + return nil + }) + assert.Nil(t, err) + }() + msg := <-m.GetMsgChannel() + b := msg.Payload.([]byte) + assert.EqualValues(t, "happy baby melody!", string(b)) + + + // disconnect + err = c.Disconnect() + assert.Nil(t, err) + if tc.config.UseWS { + assert.Nil(t, c.(*connection).wsConn) + } + if !tc.config.UseWS { + assert.Nil(t, c.(*connection).conn) + } + }) + } +} diff --git a/bridge/connection.go b/bridge/connection.go index 9d24d46..026b80a 100644 --- a/bridge/connection.go +++ b/bridge/connection.go @@ -6,6 +6,7 @@ package bridge import ( "fmt" "github.com/go-stomp/stomp" + "github.com/go-stomp/stomp/frame" "github.com/google/uuid" "github.com/vmware/transport-go/model" "log" @@ -16,7 +17,8 @@ type Connection interface { GetId() *uuid.UUID Subscribe(destination string) (Subscription, error) Disconnect() (err error) - SendMessage(destination string, payload []byte) error + SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error + SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error } // Connection represents a Connection to a message broker. @@ -135,16 +137,21 @@ func (c *connection) listenTCPFrames(src chan *stomp.Message, dst chan *model.Me } } +// SendJSONMessage sends a []byte payload carrying JSON data to a destination. +func (c *connection) SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error { + return c.SendMessage(destination, "application/json", payload, opts...) +} + // Send a []byte payload to a destination. -func (c *connection) SendMessage(destination string, payload []byte) error { +func (c *connection) SendMessage(destination string, contentType string, payload []byte, opts ...func(*frame.Frame) error) error { c.connLock.Lock() defer c.connLock.Unlock() if c != nil && !c.useWs && c.conn != nil { - c.conn.Send(destination, "application/json", payload, nil) + c.conn.Send(destination, contentType, payload, opts...) return nil } if c != nil && c.useWs && c.wsConn != nil { - c.wsConn.Send(destination, payload) + c.wsConn.Send(destination, contentType, payload, opts...) return nil } return fmt.Errorf("cannot send message, no connection") diff --git a/bridge/example_connector_broker_tcp_test.go b/bridge/example_connector_broker_tcp_test.go index 0a4f04e..6eceb21 100644 --- a/bridge/example_connector_broker_tcp_test.go +++ b/bridge/example_connector_broker_tcp_test.go @@ -19,7 +19,11 @@ func Example_connectUsingBrokerViaTCP() { config := &bridge.BrokerConnectorConfig{ Username: "guest", Password: "guest", - ServerAddr: ":61613"} + ServerAddr: ":61613", + STOMPHeader: map[string]string{ + "access-token": "test", + }, + } // connect to broker. c, err := b.ConnectBroker(config) @@ -59,7 +63,7 @@ func Example_connectUsingBrokerViaTCP() { // send messages var producer = func() { for i := 0; i < 5; i++ { - c.SendMessage("/queue/sample", []byte(fmt.Sprintf("message: %d", i))) + c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i))) } } diff --git a/bridge/example_connector_broker_ws_test.go b/bridge/example_connector_broker_ws_test.go index cd2cf8c..a70a1d6 100644 --- a/bridge/example_connector_broker_ws_test.go +++ b/bridge/example_connector_broker_ws_test.go @@ -21,8 +21,12 @@ func Example_connectUsingBrokerViaWebSocket() { Username: "guest", Password: "guest", ServerAddr: "appfabric.vmware.com", - WSPath: "/fabric", - UseWS: true} + WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"}, + UseWS: true, + STOMPHeader: map[string]string{ + "access-token": "test", + }, + } // connect to broker. c, err := b.ConnectBroker(config) diff --git a/bridge/test_server.crt b/bridge/test_server.crt new file mode 100644 index 0000000..1dc6b14 --- /dev/null +++ b/bridge/test_server.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDczCCAlugAwIBAgIUb+B5lV/qlEARHHaypVf+SPo3BdEwDQYJKoZIhvcNAQEL +BQAwSTELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExETAPBgNVBAcM +CFNhbiBKb3NlMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwNzI3MDEwODQzWhcN +MjEwODI2MDEwODQzWjBJMQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2FsaWZvcm5p +YTERMA8GA1UEBwwIU2FuIEpvc2UxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBANTinzC+ed7UlaPnJq3vQdRpCxCzuDXN +8SbNEcZRpdWB6+LE1rii/oYHMtcpk3SZYO+290Sgx78G/qw8wZymXlBSGF2InNwk +zOf/0obd8d2+zqsl6fgPJ/FCFKGYdWKoAnVZUQsZu4J0TPes4vteiQ/epIdP+e0m +rzv6pXB1+s9HYVVxOmDzY+DHWH3szESRVfATfp9753G9z7YsBCkmE7TdwHSY63Xy +ZeARCVZiW/5CARGZGVFRTt8luPPibO8U5sBlcCy59BR8GbhYIFSb6NPcXuKIFC3p +TTKU+ywpExbdVWKpW08ts2CVSNC9zYJ+9cEWxZqVM2S1j6LMXrhuCLkCAwEAAaNT +MFEwHQYDVR0OBBYEFCvZ2XCn8UqbqYS7SCBWRbMbmFFwMB8GA1UdIwQYMBaAFCvZ +2XCn8UqbqYS7SCBWRbMbmFFwMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAHR72zbdLRSZ6g3vTAvNqmy5OEhlCfpuirRW6/Exdg0nPXCEMDsg/2GX +qqV1OovKathvXKpnc00HMAjxPCemrrry89zg2KdcozaoExjMpIf/iGeJE0HYB8VS +wzgh8a7vKZC0qZVk8QJeW8o4VEYxl8k8LMA5s51prvuQ6ZPqMDkpc0FREu9eCKLE +0mbzzacdJgbxT5xt0a671CMq2XS4QEstIVCIlAMHdtXoR1WdHJxsNyKYv7iFcgdd +6ekylrG/p5r+48GizjbJewqbOhneSbv/t+WdD423lSfBqDzCfKLdbzOzrvWkCNh/ +QOUA62+6Gs7KSAq8g8zju01C4pwhhQY= +-----END CERTIFICATE----- diff --git a/bridge/test_server.key b/bridge/test_server.key new file mode 100644 index 0000000..59f55d1 --- /dev/null +++ b/bridge/test_server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDU4p8wvnne1JWj +5yat70HUaQsQs7g1zfEmzRHGUaXVgevixNa4ov6GBzLXKZN0mWDvtvdEoMe/Bv6s +PMGcpl5QUhhdiJzcJMzn/9KG3fHdvs6rJen4DyfxQhShmHViqAJ1WVELGbuCdEz3 +rOL7XokP3qSHT/ntJq87+qVwdfrPR2FVcTpg82Pgx1h97MxEkVXwE36fe+dxvc+2 +LAQpJhO03cB0mOt18mXgEQlWYlv+QgERmRlRUU7fJbjz4mzvFObAZXAsufQUfBm4 +WCBUm+jT3F7iiBQt6U0ylPssKRMW3VViqVtPLbNglUjQvc2CfvXBFsWalTNktY+i +zF64bgi5AgMBAAECggEBAI/clbdbAQCUqIsaqgjgppvkYJXo3ZN+mHigHGLoyih+ +o+mbG3+qplXsh27fqcYh4GAeRPsbq+Br2a6LYEt8IkAlOlH7AAXE2LPvLVX2pnwz +dXYzKOl5VIS1IzwQy0LgRskGkr3tpcmCzoCRVNKKEBWpX4YRhdO9UK43s5VMJTHs +znq2EXK8fwmB7v+9l3EyHEME+QW4L/iGUwP+nEQQbBKJHBL9pBTScAAPdkZVSU84 +niLwX5qgJAN+l3/akC0UPHmyRrG4D2FhU5BrZ8DTP8d8X9EXEFanavzf6BJ7hzFS +Psis5xlRMX/q/k/AugzXSgC3liodeZRt36KpbGjHEAECgYEA+zspYNcyuI7zksnI +lAClgfKGDwfu8esd75OGI1IgEisi4VTI6O8chVNIOM8S7PcKqG6fTwIQrjJ+I2RW +rC4Vt4srMYdhX/w8P9CkRtor9G0YNFEjZvG/tTvKO4k0H9SrBQDr1I2YRTWay3Ja +6nlzkBWAb1R797nl4VBPbi+hFq0CgYEA2O0fFfkGTPMdzdUttEuigHJDGbQfJVY3 +2uWBb/36oSal4qlISu279AzJnD2f/Z1mzn0oY2buI22QfeS2ktY+NE57uOOA5VW/ +i49K1MiGJdGzqWFafrpcRCW2MC16uIxoatyttnFZzp+3K6RPl9vPtFRFjq4fr4pl +8kvG10Xp170CgYAMsEpAt9258I5gxYw53WZ+j68xpybsVQGoDf1iX017XW5fjxau +nRUFrtUg+WRtT3TcWOn25ZXOFoLEatbtz04NfRvrXnkOjgDFAFartW0u5SupyMdO +9brG/oQSSbW2Pdl9YTSIAbbFGBX/XIE3AndRuaRI+y1fr6XqoVHBsOoj6QKBgQCx +0ndoELIchDLLV1RW9qVoO8JLoL7jYXD6DKb1gjJgxgi37GLpoUYwwgbHADFVuiDb +ZfekvGy8OAV5XfJYHi3xvOBo6H3yAT+6jKUgFpz5BZUMZVTEi8o0xuE5Tx7Jh7fU +2b/Azdx7p6uEp/XjG3qBbY9zbcNlp8L4QyvlrlcJ1QKBgGpOPsSqGjUAIboQ5rFV +Mi8ZkAmYMg8xW4JdP1h/OpZcrwW8scOC3n8Xk9Ht0b0XnZeOHtmAJ4wRA8zgeUyw +JmZUkarLdnFD23jTFSNf7Xeht2skUQ2YHX2BYjROqnVdEe63ENLozKe7DG5xNR9N +T8MQTaL87ZjC06jHZxeJj5Af +-----END PRIVATE KEY----- diff --git a/bus/channel_test.go b/bus/channel_test.go index 3f9a79c..4125f63 100644 --- a/bus/channel_test.go +++ b/bus/channel_test.go @@ -4,6 +4,7 @@ package bus import ( + "github.com/go-stomp/stomp/frame" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -263,8 +264,13 @@ func (c *MockBridgeConnection) Disconnect() (err error) { return nil } -func (c *MockBridgeConnection) SendMessage(destination string, payload []byte) error { - args := c.MethodCalled("SendMessage", destination, payload) +func (c *MockBridgeConnection) SendJSONMessage(destination string, payload []byte, opts ...func(frame *frame.Frame) error) error { + args := c.MethodCalled("SendJSONMessage", destination, payload) + return args.Error(0) +} + +func (c *MockBridgeConnection) SendMessage(destination, contentType string, payload []byte, opts ...func(frame *frame.Frame) error) error { + args := c.MethodCalled("SendMessage", destination, contentType, payload) return args.Error(0) } diff --git a/bus/eventbus_test.go b/bus/eventbus_test.go index fdd1a84..e8a023a 100644 --- a/bus/eventbus_test.go +++ b/bus/eventbus_test.go @@ -599,7 +599,9 @@ func TestChannelManager_TestConnectBroker(t *testing.T) { Username: "test", Password: "test", UseWS: true, - WSPath: "/", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/", + }, ServerAddr: "broker-url"} id := uuid.New() diff --git a/bus/example_galactic_channels_test.go b/bus/example_galactic_channels_test.go index 2fbd36c..99cea86 100644 --- a/bus/example_galactic_channels_test.go +++ b/bus/example_galactic_channels_test.go @@ -57,7 +57,9 @@ func Example_usingGalacticChannels() { Username: "guest", Password: "guest", ServerAddr: "appfabric.vmware.com", - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + }, UseWS: true} // connect to broker. diff --git a/bus/store.go b/bus/store.go index af76ba2..74b26c7 100644 --- a/bus/store.go +++ b/bus/store.go @@ -227,7 +227,7 @@ func (store *busStore) sendGalacticRequest(requestCmd string, requestPayload int syncChannelConfig := store.galacticConf.syncChannelConfig // send request. - syncChannelConfig.conn.SendMessage( + syncChannelConfig.conn.SendJSONMessage( syncChannelConfig.pubPrefix + syncChannelConfig.syncChannelName, jsonReq) } diff --git a/bus/store_manager.go b/bus/store_manager.go index 2d871dd..d46d0d9 100644 --- a/bus/store_manager.go +++ b/bus/store_manager.go @@ -5,6 +5,7 @@ package bus import ( "fmt" + "github.com/go-stomp/stomp/frame" "github.com/google/uuid" "github.com/vmware/transport-go/bridge" "reflect" @@ -36,7 +37,8 @@ type StoreManager interface { // Interface which is a subset of the bridge.Connection methods. // Used to mock connection objects during unit testing. type galacticStoreConnection interface { - SendMessage(destination string, payload []byte) error + SendJSONMessage(destination string, payload []byte, opts ...func(frame *frame.Frame) error) error + SendMessage(destination, contentType string, payload []byte, opts ...func(frame *frame.Frame) error) error } type storeSyncChannelConfig struct { diff --git a/bus/store_manager_test.go b/bus/store_manager_test.go index 6117ad1..323b823 100644 --- a/bus/store_manager_test.go +++ b/bus/store_manager_test.go @@ -122,7 +122,8 @@ func TestStoreManager_OpenGalacticStore(t *testing.T) { Id: &subId, } con.On("Subscribe", mock.Anything).Return(sub, nil) - con.On("SendMessage", mock.Anything, mock.Anything).Return(nil) + con.On("SendJSONMessage", mock.Anything, mock.Anything).Return(nil) + con.On("SendMessage", mock.Anything, mock.Anything, mock.Anything).Return(nil) m.ConfigureStoreSyncChannel(con, "/topic-prefix", "/pub-prefix") storeManagerImpl := m.(*storeManager) @@ -174,6 +175,7 @@ func TestStoreManager_OpenGalacticStoreWithType(t *testing.T) { Id: &subId, } con.On("Subscribe", mock.Anything).Return(sub, nil) + con.On("SendJSONMessage", mock.Anything, mock.Anything).Return(nil) con.On("SendMessage", mock.Anything, mock.Anything).Return(nil) m.ConfigureStoreSyncChannel(con, "/topic-prefix", "/pub-prefix") diff --git a/bus/store_test.go b/bus/store_test.go index edcb1bf..2ca9631 100644 --- a/bus/store_test.go +++ b/bus/store_test.go @@ -4,13 +4,14 @@ package bus import ( - "testing" + "encoding/json" + "fmt" + "github.com/go-stomp/stomp/frame" "github.com/stretchr/testify/assert" + "reflect" "sync" "sync/atomic" - "fmt" - "reflect" - "encoding/json" + "testing" ) type testItem struct { @@ -27,13 +28,19 @@ func testStore() BusStore { type mockGalacticStoreConnection struct { messages []map[string]interface{} topics []string + opts []func(fr *frame.Frame) error +} + +func (con *mockGalacticStoreConnection) SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error { + return con.SendMessage(destination, "application/json", payload, opts...) } -func (con *mockGalacticStoreConnection) SendMessage(destination string, payload []byte) error { +func (con *mockGalacticStoreConnection) SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error { var msgPayload map[string]interface{} json.Unmarshal(payload, &msgPayload) con.messages = append(con.messages, msgPayload) con.topics = append(con.topics, destination) + con.opts = opts return nil } @@ -55,6 +62,7 @@ func testGalacticStore(itemType reflect.Type) (BusStore, *mockGalacticStoreConne conn := &mockGalacticStoreConnection{ messages: make([]map[string]interface{}, 0), topics: make([]string,0), + opts: make([]func(*frame.Frame) error, 0), } conf := &galacticStoreConfig{ diff --git a/go.sum b/go.sum index 1dade13..3d0e39e 100644 --- a/go.sum +++ b/go.sum @@ -13,13 +13,8 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= -github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10= -github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -34,12 +29,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191008105621-543471e840be h1:QAcqgptGM8IQBC9K/RC4o+O9YmqEm0diQn9QmZw/0mU= -golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/transport.go b/transport.go index 2d3d233..e92561f 100644 --- a/transport.go +++ b/transport.go @@ -157,7 +157,10 @@ func runDemoCal() { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: "appfabric.vmware.com"} // connect to broker. @@ -182,7 +185,7 @@ func runDemoCal() { fmt.Println("Requesting time from calendar service") // send request. - c.SendMessage("/pub/"+channel, m) + c.SendJSONMessage("/pub/"+channel, m) // wait for done signal <-done @@ -249,7 +252,10 @@ func runDemoVmService(ctx *cli.Context) { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: addr} // connect to broker. @@ -272,7 +278,7 @@ func runDemoVmService(ctx *cli.Context) { Payload: payload, } m, _ := json.Marshal(r) - c.SendMessage("/pub/"+channel, m) + c.SendJSONMessage("/pub/"+channel, m) // wait for done signal <-done @@ -344,7 +350,10 @@ func runDemoStore(ctx *cli.Context) { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: addr} // connect to broker. @@ -492,7 +501,10 @@ func runDemoApp(ctx *cli.Context) { Username: "guest", Password: "guest", UseWS: true, - WSPath: "/fabric", + WebSocketConfig: &bridge.WebSocketConfig{ + WSPath: "/fabric", + UseTLS: false, + }, ServerAddr: addr} } @@ -516,7 +528,7 @@ func runDemoApp(ctx *cli.Context) { pl := "ping--" + strconv.Itoa(rand.Intn(10000000)) r := &model.Request{Request: "basic", Payload: pl} m, _ := json.Marshal(r) - c.SendMessage("/pub/"+PongServiceChan, m) + c.SendJSONMessage("/pub/"+PongServiceChan, m) time.Sleep(500 * time.Millisecond) } @@ -570,7 +582,7 @@ func runDemoApp(ctx *cli.Context) { pl := "ping--" + strconv.Itoa(rand.Intn(10000000)) r := &model.Request{Request: "full", Payload: pl} m, _ := json.Marshal(r) - c.SendMessage("/pub/queue/"+PongServiceChan, m) + c.SendJSONMessage("/pub/queue/"+PongServiceChan, m) time.Sleep(500 * time.Millisecond) }