diff --git a/plank/cmd/broker_sample/app.go b/plank/cmd/broker_sample/app.go index 133b98a..ad3794e 100644 --- a/plank/cmd/broker_sample/app.go +++ b/plank/cmd/broker_sample/app.go @@ -16,7 +16,7 @@ import ( // main app to test drive transport's broker APIs. provide LISTEN_METHOD and PRODUCE_MESSAGE_ON_RABBITMQ as // environment variables to control the behavior of this app. -// LISTEN_METHOD: decides how/where the demo app should receive possible values from (plank_ws, rbmq_amqp, rbmq_stomp) +// LISTEN_METHOD: decides how/where the demo app should receive messages from (plank_ws, plank_stomp, rbmq_amqp, rbmq_stomp) // PRODUCE_MESSAGE_ON_RABBITMQ: when set to 1 it sends a dummy message every two seconds // WS_USE_TLS: when set to 1, connection to Plank WebSocket will be made through wss protocol instead of ws func main() { @@ -57,6 +57,9 @@ func main() { // need to have a Plank instance running at default port (30080) plank.ListenViaWS(c, wsUseTls) break + case "plank_stomp": + // listen to the sample channel on Plank through STOMP exposed at TCP port 61613 + plank.ListenViaStomp(c) case "rbmq_amqp": // listen to the sample channel through AMQP // need to have a RabbitMQ instance running at default port diff --git a/plank/cmd/broker_sample/plank/over_tcp.go b/plank/cmd/broker_sample/plank/over_tcp.go new file mode 100644 index 0000000..c24f465 --- /dev/null +++ b/plank/cmd/broker_sample/plank/over_tcp.go @@ -0,0 +1,72 @@ +package plank + +import ( + "fmt" + "github.com/vmware/transport-go/bridge" + "github.com/vmware/transport-go/bus" + "os" + "syscall" + "time" +) + +// ListenViaStomp listens to another Plank instance via TCP +func ListenViaStomp(c chan os.Signal) { + brokerConn, err := connectToBroker() + if err != nil { + fmt.Println(fmt.Errorf("broker connection failed: %w", err)) + os.Exit(1) + } + + // subscribe to topic simple-stream which keeps sending a random word at an interval + sub, err := brokerConn.Subscribe("/topic/simple-stream") + if err != nil { + fmt.Println(fmt.Errorf("subscription failed: %w", err)) + os.Exit(1) + } + + // let's disconnect after 10 seconds for the sake of an example + disconnectInTime := 10 * time.Second + tenSecTimer := time.NewTimer(disconnectInTime) + + go func() { + for { + select { + case msg := <-sub.GetMsgChannel(): + // extract payload (of string type) from the Message object + var payload string + if err := msg.CastPayloadToType(&payload); err != nil { + fmt.Printf("failed to cast payload: %s\n", err.Error()) + continue + } + fmt.Printf("msg body: %s\n", payload) + break + case <-tenSecTimer.C: + fmt.Println(disconnectInTime.String() + " elapsed. Disconnecting") + tenSecTimer.Stop() + c <- syscall.SIGINT + break + } + } + }() + + <-c +} + +// connectToBroker wraps the connection logic to the broker and returns the bridge connection object and an error +func connectToBroker() (bridge.Connection, error) { + b := bus.GetBus() + brokerConfig := &bridge.BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + ServerAddr: ":61613", + HeartBeatOut: 30 * time.Second, + } + + brokerConn, err := b.ConnectBroker(brokerConfig) + if err != nil { + return nil, err + } + + fmt.Println("broker connected. broker ID:", brokerConn.GetId()) + return brokerConn, nil +} diff --git a/plank/config.json b/plank/config.json index 4b507d0..87cdaf9 100644 --- a/plank/config.json +++ b/plank/config.json @@ -40,6 +40,8 @@ "rest_bridge_timeout_in_minutes": 1, "fabric_config": { "fabric_endpoint": "/ws", + "use_tcp": false, + "tcp_port": 61613, "endpoint_config": { "TopicPrefix": "/topic", "UserQueuePrefix": "/queue", diff --git a/plank/pkg/server/banner.go b/plank/pkg/server/banner.go index 8461e82..4548000 100644 --- a/plank/pkg/server/banner.go +++ b/plank/pkg/server/banner.go @@ -50,6 +50,10 @@ func (ps *platformServer) printBanner() { if ps.serverConfig.FabricConfig != nil { utils.InfoFprintf(ps.out, "Fabric endpoint\t\t") + if ps.serverConfig.FabricConfig.UseTCP { + _, _ = fmt.Fprintln(ps.out, fmt.Sprintf(":%d (TCP)", ps.serverConfig.FabricConfig.TCPPort)) + return + } _, _ = fmt.Fprintln(ps.out, ps.serverConfig.FabricConfig.FabricEndpoint) } diff --git a/plank/pkg/server/banner_test.go b/plank/pkg/server/banner_test.go index 36e3d9a..cff4413 100644 --- a/plank/pkg/server/banner_test.go +++ b/plank/pkg/server/banner_test.go @@ -109,6 +109,43 @@ func TestPrintBanner_FabricConfig(t *testing.T) { assert.Contains(t, string(logContents), "Fabric endpoint\t\t/ws") } +func TestPrintBanner_FabricConfig_TCP(t *testing.T) { + testRoot := filepath.Join(os.TempDir(), "plank-tests") + _ = os.MkdirAll(testRoot, 0755) + testLogFile := filepath.Join(testRoot, "testlog.log") + defer os.RemoveAll(testRoot) + + cfg := GetBasicTestServerConfig(testRoot, testLogFile, testLogFile, testLogFile, 9981, false) + cfg.FabricConfig = &FabricBrokerConfig{ + FabricEndpoint: "/ws", + UseTCP: true, + TCPPort: 61613, + EndpointConfig: &bus.EndpointConfig{ + TopicPrefix: "/topic", + UserQueuePrefix: "/queue", + AppRequestPrefix: "/pub/topic", + AppRequestQueuePrefix: "/pub/queue", + Heartbeat: 30000, + }, + } + _, _, testServerInterface := CreateTestServer(cfg) + testServer := testServerInterface.(*platformServer) + + // act + testServer.printBanner() + + // assert + logContents, err := ioutil.ReadFile(testLogFile) + if err != nil { + assert.Fail(t, err.Error()) + } + + assert.FileExists(t, testLogFile) + assert.Contains(t, string(logContents), "Host\t\t\tlocalhost") + assert.Contains(t, string(logContents), "Port\t\t\t9981") + assert.Contains(t, string(logContents), "Fabric endpoint\t\t:61613 (TCP)") +} + func TestPrintBanner_SpaConfig(t *testing.T) { testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) @@ -141,6 +178,39 @@ func TestPrintBanner_SpaConfig(t *testing.T) { assert.Contains(t, string(logContents), "SPA static assets\t/a, /b, /public/c") } +func TestPrintBanner_SpaConfig_NoStaticAssets(t *testing.T) { + testRoot := filepath.Join(os.TempDir(), "plank-tests") + _ = os.MkdirAll(testRoot, 0755) + testLogFile := filepath.Join(testRoot, "testlog.log") + defer os.RemoveAll(testRoot) + + cfg := GetBasicTestServerConfig(testRoot, testLogFile, testLogFile, testLogFile, 9981, false) + cfg.SpaConfig = &SpaConfig{ + RootFolder: testRoot, + BaseUri: "/", + StaticAssets: make([]string, 0), + CacheControlRules: nil, + } + + _, _, testServerInterface := CreateTestServer(cfg) + testServer := testServerInterface.(*platformServer) + + // act + testServer.printBanner() + + // assert + logContents, err := ioutil.ReadFile(testLogFile) + if err != nil { + assert.Fail(t, err.Error()) + } + + assert.FileExists(t, testLogFile) + assert.Contains(t, string(logContents), "Host\t\t\tlocalhost") + assert.Contains(t, string(logContents), "Port\t\t\t9981") + assert.Contains(t, string(logContents), "SPA endpoint\t\t/") + assert.Contains(t, string(logContents), "SPA static assets\t-") +} + func TestPrintBanner_Prometheus(t *testing.T) { testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) diff --git a/plank/pkg/server/base_error.go b/plank/pkg/server/base_error.go new file mode 100644 index 0000000..d4646ae --- /dev/null +++ b/plank/pkg/server/base_error.go @@ -0,0 +1,36 @@ +package server + +import "fmt" + +var ( + errServerInit = &baseError{message: "Server initialization failed"} + errHttp = &baseError{message: "HTTP error"} + errInternal = &baseError{message: "Internal error"} + errUndefined = &baseError{message: "Undefined error"} +) + +type baseError struct { + wrappedErr error + baseErr *baseError + message string +} + +func (e baseError) Is(err error) bool { + return e.baseErr == err +} + +func (e baseError) Error() string { + return fmt.Sprintf("[plank] Error: %s: %s\n", e.baseErr.message, e.wrappedErr.Error()) +} + +func wrapError(baseType error, err error) error { + switch baseType { + case errServerInit: + return &baseError{baseErr: errServerInit, wrappedErr: err} + case errInternal: + return &baseError{baseErr: errInternal, wrappedErr: err} + case errHttp: + return &baseError{baseErr: errHttp, wrappedErr: err} + } + return &baseError{baseErr: errUndefined, wrappedErr: err} +} diff --git a/plank/pkg/server/base_error_test.go b/plank/pkg/server/base_error_test.go new file mode 100644 index 0000000..1e0d553 --- /dev/null +++ b/plank/pkg/server/base_error_test.go @@ -0,0 +1,47 @@ +package server + +import ( + "errors" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBaseError_Is_errServerInit(t *testing.T) { + e := wrapError(errServerInit, errors.New("some init fail")) + assert.True(t, errors.Is(e, errServerInit)) +} + +func TestBaseError_Error_errServerInit(t *testing.T) { + e := wrapError(errServerInit, errors.New("some init fail")) + assert.EqualValues(t, "[plank] Error: Server initialization failed: some init fail\n", e.Error()) +} + +func TestBaseError_Is_errInternal(t *testing.T) { + e := wrapError(errInternal, errors.New("internal server error")) + assert.True(t, errors.Is(e, errInternal)) +} + +func TestBaseError_Error_errInternal(t *testing.T) { + e := wrapError(errInternal, errors.New("internal server error")) + assert.EqualValues(t, "[plank] Error: Internal error: internal server error\n", e.Error()) +} + +func TestBaseError_Is_errHttp(t *testing.T) { + e := wrapError(errHttp, errors.New("404")) + assert.True(t, errors.Is(e, errHttp)) +} + +func TestBaseError_Error_errHttp(t *testing.T) { + e := wrapError(errHttp, errors.New("404")) + assert.EqualValues(t, "[plank] Error: HTTP error: 404\n", e.Error()) +} + +func TestBaseError_Is_undefined(t *testing.T) { + e := wrapError(errors.New("some random stuff"), errors.New("?")) + assert.True(t, errors.Is(e, errUndefined)) +} + +func TestBaseError_Error_undefined(t *testing.T) { + e := wrapError(errors.New("some random stuff"), errors.New("?")) + assert.EqualValues(t, "[plank] Error: Undefined error: ?\n", e.Error()) +} diff --git a/plank/pkg/server/core_models.go b/plank/pkg/server/core_models.go index 2e5bd2f..3c927fa 100644 --- a/plank/pkg/server/core_models.go +++ b/plank/pkg/server/core_models.go @@ -46,6 +46,8 @@ type TLSCertConfig struct { // FabricBrokerConfig defines the endpoint for WebSocket as well as detailed endpoint configuration type FabricBrokerConfig struct { FabricEndpoint string `json:"fabric_endpoint"` // URI to WebSocket endpoint + UseTCP bool `json:"use_tcp"` // Use TCP instead of WebSocket + TCPPort int `json:"tcp_port"` // TCP port to use if UseTCP is true EndpointConfig *bus.EndpointConfig `json:"endpoint_config"` // STOMP configuration } diff --git a/plank/pkg/server/initialize.go b/plank/pkg/server/initialize.go index 276af09..da5d2bb 100644 --- a/plank/pkg/server/initialize.go +++ b/plank/pkg/server/initialize.go @@ -163,12 +163,15 @@ func (ps *platformServer) configureFabric() { } var err error - // TODO: consider tightening access by allowing configuring allowedOrigins - ps.fabricConn, err = stompserver.NewWebSocketConnectionFromExistingHttpServer( - ps.HttpServer, - ps.router, - ps.serverConfig.FabricConfig.FabricEndpoint, - nil) + if ps.serverConfig.FabricConfig.UseTCP { + ps.fabricConn, err = stompserver.NewTcpConnectionListener(fmt.Sprintf(":%d", ps.serverConfig.FabricConfig.TCPPort)) + } else { + ps.fabricConn, err = stompserver.NewWebSocketConnectionFromExistingHttpServer( + ps.HttpServer, + ps.router, + ps.serverConfig.FabricConfig.FabricEndpoint, + nil) // TODO: consider tightening access by allowing configuring allowedOrigins + } // if creation of listener fails, crash and burn if err != nil { diff --git a/plank/pkg/server/server.go b/plank/pkg/server/server.go index 82f02a3..9707fa9 100644 --- a/plank/pkg/server/server.go +++ b/plank/pkg/server/server.go @@ -9,9 +9,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "github.com/vmware/transport-go/model" "io/ioutil" "net" "net/http" @@ -24,6 +21,10 @@ import ( "syscall" "time" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/vmware/transport-go/model" + "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/vmware/transport-go/bus" @@ -156,21 +157,37 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { ps.ServerAvailability.Http = true if ps.serverConfig.TLSCertConfig != nil { utils.Log.Infof("[plank] Starting HTTP server at %s:%d with TLS", ps.serverConfig.Host, ps.serverConfig.Port) - _ = ps.HttpServer.ListenAndServeTLS(ps.serverConfig.TLSCertConfig.CertFile, ps.serverConfig.TLSCertConfig.KeyFile) + if err := ps.HttpServer.ListenAndServeTLS(ps.serverConfig.TLSCertConfig.CertFile, ps.serverConfig.TLSCertConfig.KeyFile); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + utils.Log.Fatalln(wrapError(errServerInit, err)) + } + } } else { utils.Log.Infof("[plank] Starting HTTP server at %s:%d", ps.serverConfig.Host, ps.serverConfig.Port) - _ = ps.HttpServer.ListenAndServe() + if err := ps.HttpServer.ListenAndServe(); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + utils.Log.Fatalln(wrapError(errServerInit, err)) + } + } } }() // if Fabric broker configuration is found, start the broker if ps.serverConfig.FabricConfig != nil { go func() { - utils.Log.Infof("[plank] Starting Transport broker at %s:%d%s", - ps.serverConfig.Host, ps.serverConfig.Port, ps.serverConfig.FabricConfig.FabricEndpoint) + fabricPort := ps.serverConfig.Port + fabricEndpoint := ps.serverConfig.FabricConfig.FabricEndpoint + if ps.serverConfig.FabricConfig.UseTCP { + // if using TCP adjust port accordingly and drop endpoint + fabricPort = ps.serverConfig.FabricConfig.TCPPort + fabricEndpoint = "" + } + brokerLocation := fmt.Sprintf("%s:%d%s", ps.serverConfig.Host, fabricPort, fabricEndpoint) + utils.Log.Infof("[plank] Starting Transport broker at %s", brokerLocation) ps.ServerAvailability.Fabric = true + if err := ps.eventbus.StartFabricEndpoint(ps.fabricConn, *ps.serverConfig.FabricConfig.EndpointConfig); err != nil { - panic(err) + utils.Log.Fatalln(wrapError(errServerInit, err)) } }() } @@ -246,7 +263,7 @@ func (ps *platformServer) StopServer() { } if ps.fabricConn != nil { - err = ps.fabricConn.Close() + err = ps.eventbus.StopFabricEndpoint() if err != nil { utils.Log.Errorln(err) } diff --git a/plank/pkg/server/server_smoke_test.go b/plank/pkg/server/server_smoke_test.go index ed65144..f4def17 100644 --- a/plank/pkg/server/server_smoke_test.go +++ b/plank/pkg/server/server_smoke_test.go @@ -1,6 +1,7 @@ package server import ( + "crypto/tls" "fmt" "github.com/stretchr/testify/assert" "github.com/vmware/transport-go/bus" @@ -13,6 +14,58 @@ import ( "testing" ) +// TestSmokeTests_TLS tests if Plank starts with TLS enabled +func TestSmokeTests_TLS(t *testing.T) { + // pre-arrange + newBus := bus.ResetBus() + service.ResetServiceRegistry() + testRoot := filepath.Join(os.TempDir(), "plank-tests") + _ = os.MkdirAll(testRoot, 0755) + defer os.RemoveAll(testRoot) + + // arrange + port := GetTestPort() + cfg := GetBasicTestServerConfig(testRoot, "stdout", "null", "stderr", port, true) + cfg.FabricConfig = GetTestFabricBrokerConfig() + cfg.TLSCertConfig = GetTestTLSCertConfig(testRoot) + + // act + var wg sync.WaitGroup + sigChan := make(chan os.Signal) + baseUrl, _, testServer := CreateTestServer(cfg) + testServerInternal := testServer.(*platformServer) + testServerInternal.setEventBusRef(newBus) + + // assert to make sure the server was created with the correct test arguments + assert.EqualValues(t, fmt.Sprintf("https://localhost:%d", port), baseUrl) + + wg.Add(1) + go testServer.StartServer(sigChan) + + originalTransport := http.DefaultTransport + originalTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + RunWhenServerReady(t, newBus, func(t *testing.T) { + resp, err := http.Get(baseUrl) + if err != nil { + defer func() { + testServer.StopServer() + wg.Done() + }() + t.Fatal(err) + } + assert.EqualValues(t, http.StatusNotFound, resp.StatusCode) + testServer.StopServer() + wg.Done() + }) + wg.Wait() +} + +// TestSmokeTests_TLS_InvalidCert tests if Plank fails to start because of an invalid cert +func TestSmokeTests_TLS_InvalidCert(t *testing.T) { + // TODO: make StartServer return an error object so it's easier to test +} + func TestSmokeTests(t *testing.T) { newBus := bus.ResetBus() service.ResetServiceRegistry() @@ -24,16 +77,8 @@ func TestSmokeTests(t *testing.T) { port := GetTestPort() cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", port, true) cfg.NoBanner = true - cfg.FabricConfig = &FabricBrokerConfig{ - FabricEndpoint: "/ws", - EndpointConfig: &bus.EndpointConfig{ - TopicPrefix: "/topic", - UserQueuePrefix: "/queue", - AppRequestPrefix: "/pub/topic", - AppRequestQueuePrefix: "/pub/queue", - Heartbeat: 30000, - }, - } + cfg.FabricConfig = GetTestFabricBrokerConfig() + baseUrl, _, testServer := CreateTestServer(cfg) testServer.(*platformServer).eventbus = newBus diff --git a/plank/pkg/server/test_server.crt.go b/plank/pkg/server/test_server.crt.go new file mode 100644 index 0000000..9a574d9 --- /dev/null +++ b/plank/pkg/server/test_server.crt.go @@ -0,0 +1,23 @@ +package server + +var testServerCertTmpl = `-----BEGIN CERTIFICATE----- +MIIDUDCCAjigAwIBAgIJAMxhMuXHHR1fMA0GCSqGSIb3DQEBBQUAMFAxCzAJBgNV +BAYTAkNOMQswCQYDVQQIDAJHRDELMAkGA1UEBwwCU1oxEzARBgNVBAoMCkFjbWUs +IEluYy4xEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMTExMjMwMDU5NTFaFw0zMTEx +MjEwMDU5NTFaMFAxCzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDELMAkGA1UEBwwC +U1oxEzARBgNVBAoMCkFjbWUsIEluYy4xEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIw +DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAK9jHa4JqrSlOp3rvDCoZtHdmu4V +PxxHkujRDMhZb4kZXEotKdXyFJ9k93vQE72z5VmXvtH29EN+JHcMN3CGn+6F1fjh +kN6R4IfNpo7UCUXa55MsVuw05FZ5ycoJfpP/k9rkqD9Rrh1pNjhE9jY1Q1GNf1Sp +u4QNBqFT9bNgQ2u8AJrsYwrDOJuGDuL8Ub4icWL2ggXMsdrlikNb53g8z6Hc62V2 +ZkYxJNmc+khtipsHeQT4k+tfKFo8cB7f8bjX/B+R7BXmGDecJxZ/BvDywpj2A1Kd +7uh19rsFjCul0iOprDyaTk0jLk6OU81IxjbhyVvFg38IeUL1waLVEgeRX70CAwEA +AaMtMCswEwYDVR0lBAwwCgYIKwYBBQUHAwEwFAYDVR0RBA0wC4IJbG9jYWxob3N0 +MA0GCSqGSIb3DQEBBQUAA4IBAQCTmw+KFugNbGyLUUaMfiu15P/HHGPLq4jV1VPR +h1WvE9NVVZamfqJ/5wYodg724rgnVhokZB7ycUTet6DjfdeLXPU2HQAB1fZGyMmV +puSkj14qAVM+o3WZpIrRMsp2sDj+lIZ5yDujbEzR3AHe7cd5dAQSAhinsfKl90mX +wzB9Yc3CgHWQYu1cLl90XbrgdOhyLGrPBxUEk1UZPwDnLQ+yzQgHfL4acK/nnf/8 +fsAm/eRg3J+vaaBgWoHd4Flyq90/MJpZUsdC2ltetjNns3yFPeR8KpeeOc7qmmHJ +lU8ALVAEmXlMuC1QZPs4aMRFKBsugNcDwcg+s2N8PJO9UoMd +-----END CERTIFICATE----- +` diff --git a/plank/pkg/server/test_server.key.go b/plank/pkg/server/test_server.key.go new file mode 100644 index 0000000..65ba2b7 --- /dev/null +++ b/plank/pkg/server/test_server.key.go @@ -0,0 +1,31 @@ +package server + +var testServerKeyTmpl = `-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCvYx2uCaq0pTqd +67wwqGbR3ZruFT8cR5Lo0QzIWW+JGVxKLSnV8hSfZPd70BO9s+VZl77R9vRDfiR3 +DDdwhp/uhdX44ZDekeCHzaaO1AlF2ueTLFbsNORWecnKCX6T/5Pa5Kg/Ua4daTY4 +RPY2NUNRjX9UqbuEDQahU/WzYENrvACa7GMKwzibhg7i/FG+InFi9oIFzLHa5YpD +W+d4PM+h3OtldmZGMSTZnPpIbYqbB3kE+JPrXyhaPHAe3/G41/wfkewV5hg3nCcW +fwbw8sKY9gNSne7odfa7BYwrpdIjqaw8mk5NIy5OjlPNSMY24clbxYN/CHlC9cGi +1RIHkV+9AgMBAAECggEAG6kmJSyhH5xvgjiLlaD5ll+874+qRGuFX/KYcYCpdpXp +WEmHL5beacUiwwQuGN0mbtrf3X11SK+8UIo8tCKHVrcE6BurHW8kWka03hHS5JGB +tFRp4Rb2bC0JBjQ5Il5Mk1k8r7/SFhFh274ElsgPtez7B/8887aaNRdoyMGF4Jej +I1sadaX7URUjkVQNmeBGEhZdFUN9S+jiItZ7iULnYJC+3NfyfPI7tTzi3wTnA97Z +M3xETaqRU6rodm65ngK7fJ1+FxTM8iikKc64vNUMJV1axh+ha5D77GM+jCjBgeME +2dnfFHsNKtvo/yDCMcX/iJbq+7Cf2vqGsK8fmhvMAQKBgQDaCdb3BIHqmZYR7KNM +hKuTZLoUH9ChnDIsyedpRp04rW0ryGxwWu0V8/tP+fgXrXHRly9xhevuJy7L3Y5W +pv7vH9ZUODtIAuJTAgxgZSDu+dHegqfkwKY9tBmyqUrCFdBXLkBo7u6Y1eVXKjKH +FiweIfUwuvMCtBFsHAktKEWsKQKBgQDN7EXSpMISyUHnZxmkL5weh3Bc92gvkPmO +Us1ND5TfyQpQUS4KAiIo3eOk3CLfSvZriDOKxPuTAsy/K1hXYsfGefZf4F6gerrc +V1k5rY8HRu4+4YlFUieDMoiB8e1Zf93ruMkH4rW7XkfNNK2YOqlaBb2cA5kj1/KP +G0JgSNJJdQKBgQDLnN67BOGRxGWJTZPdtBNJ7Il8m41ILkYI32+UN1ZBBGtrtJnX +foHiu1oYAJY2/kjI8kdi2y5M0xh3VupCb2aoxfFv6qcpg0/5NnN6XVYY9QCQqzDA +hUt5WcOZvVBL2PUbRNzvYRk9bpniAUz4K7N9XDbNj/e5sZCHhdLn8bDGqQKBgQCP +nyQPx4PocGA70efLYL1leZc/7/rFejrkLhIslhqCfohn/ka7CkPi9ueIG1VjIbh+ +xW8VcU/5d/Fvv/6MbEPjxTq2Iho9mXvspCuCE+/25lzSlKEWLTeLxNn6r86YJpuU +hbwIePHBbf9sOzp8OyXUuL5HKydLHZl6gEuqNgAc5QKBgGii1ruy9xzcYg8h0kqH +6qpkKqJQsRV5j0hpPjocyVzqJBsLIih2apeNAfBi5dK8U1qR69iSdcD3WoOWFVYN +stvkvCQ+UUeKnAHHziThZZ7A70a/mjB1n6nmORD6PJc7TdjHzpR6h/CiWzMtYCME +IdLxe21GRIXgkfoQnLWCMs3A +-----END PRIVATE KEY----- +` diff --git a/plank/pkg/server/test_suite_harness.go b/plank/pkg/server/test_suite_harness.go index f839131..06ed1d0 100644 --- a/plank/pkg/server/test_suite_harness.go +++ b/plank/pkg/server/test_suite_harness.go @@ -174,6 +174,35 @@ func GetTestPort() int { return testSuitePortMap[fr.File] } +// GetTestTLSCertConfig returns a new &TLSCertConfig for testing. +func GetTestTLSCertConfig(testRootPath string) *TLSCertConfig { + crtFile := filepath.Join(testRootPath, "test_server.crt") + keyFile := filepath.Join(testRootPath, "test_server.key") + _ = ioutil.WriteFile(crtFile, []byte(testServerCertTmpl), 0700) + _ = ioutil.WriteFile(keyFile, []byte(testServerKeyTmpl), 0700) + return &TLSCertConfig{ + CertFile: crtFile, + KeyFile: keyFile, + SkipCertificateValidation: true, + } +} + +// GetTestFabricBrokerConfig returns a basic fabric broker config. +func GetTestFabricBrokerConfig() *FabricBrokerConfig { + return &FabricBrokerConfig{ + FabricEndpoint: "/ws", + UseTCP: false, + TCPPort: 61613, + EndpointConfig: &bus.EndpointConfig{ + TopicPrefix: "/topic", + UserQueuePrefix: "/queue", + AppRequestPrefix: "/pub", + AppRequestQueuePrefix: "/pub/queue", + Heartbeat: 30000, + }, + } +} + // CreateConfigJsonForTest creates and returns the path to a file containing the plank configuration in JSON format func CreateConfigJsonForTest() (string, error) { configJsonContent := `{ diff --git a/stompserver/server.go b/stompserver/server.go index e520f70..ec9a249 100644 --- a/stompserver/server.go +++ b/stompserver/server.go @@ -198,17 +198,24 @@ func (s *stompServer) Stop() { func (s *stompServer) waitForConnections() { for { + if !s.running { + return + } + rawConn, err := s.connectionListener.Accept() if err != nil { - log.Println("Failed to establish client connection:", err) - } else { - c := NewStompConn(rawConn, s.config, s.connectionEvents) - - s.connectionEvents <- &ConnEvent{ - ConnId: c.GetId(), - conn: c, - eventType: ConnectionStarting, + if s.running { + log.Println("Failed to establish client connection:", err) } + continue + } + + c := NewStompConn(rawConn, s.config, s.connectionEvents) + + s.connectionEvents <- &ConnEvent{ + ConnId: c.GetId(), + conn: c, + eventType: ConnectionStarting, } } } @@ -310,8 +317,6 @@ func (s *stompServer) handleConnectionEvent(e *ConnEvent) { } case IncomingMessage: - s.sendFrame(e.destination, e.frame) - if s.config.IsAppRequestDestination(e.destination) && e.conn != nil { // notify app listeners for _, callback := range s.applicationRequestCallbacks {