Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
[fix] Add missing STOMP over TCP option for Plank (#42)
Browse files Browse the repository at this point in the history
* [fix] Add missing STOMP over TCP option for Plank

Signed-off-by: Josh Kim <kjosh@vmware.com>

* [fix] incorrect outbound send call for incoming msg

Signed-off-by: Josh Kim <kjosh@vmware.com>

* (new) broker_sample example talking to another Plank via TCP

Signed-off-by: Josh Kim <kjosh@vmware.com>

* fix: handle server start errors properly

Signed-off-by: Josh Kim <kjosh@vmware.com>

* test: increase test coverage

Signed-off-by: Josh Kim <kjosh@vmware.com>

* change: simplify sample code using CastPayloadToType

Signed-off-by: Josh Kim <kjosh@vmware.com>
  • Loading branch information
jooskim authored Dec 1, 2021
1 parent 27e75b6 commit 553eea5
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 36 deletions.
5 changes: 4 additions & 1 deletion plank/cmd/broker_sample/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// main app to test drive transport's broker APIs. provide LISTEN_METHOD and PRODUCE_MESSAGE_ON_RABBITMQ as
// environment variables to control the behavior of this app.
// LISTEN_METHOD: decides how/where the demo app should receive possible values from (plank_ws, rbmq_amqp, rbmq_stomp)
// LISTEN_METHOD: decides how/where the demo app should receive messages from (plank_ws, plank_stomp, rbmq_amqp, rbmq_stomp)
// PRODUCE_MESSAGE_ON_RABBITMQ: when set to 1 it sends a dummy message every two seconds
// WS_USE_TLS: when set to 1, connection to Plank WebSocket will be made through wss protocol instead of ws
func main() {
Expand Down Expand Up @@ -57,6 +57,9 @@ func main() {
// need to have a Plank instance running at default port (30080)
plank.ListenViaWS(c, wsUseTls)
break
case "plank_stomp":
// listen to the sample channel on Plank through STOMP exposed at TCP port 61613
plank.ListenViaStomp(c)
case "rbmq_amqp":
// listen to the sample channel through AMQP
// need to have a RabbitMQ instance running at default port
Expand Down
72 changes: 72 additions & 0 deletions plank/cmd/broker_sample/plank/over_tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package plank

import (
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"os"
"syscall"
"time"
)

// ListenViaStomp listens to another Plank instance via TCP
func ListenViaStomp(c chan os.Signal) {
brokerConn, err := connectToBroker()
if err != nil {
fmt.Println(fmt.Errorf("broker connection failed: %w", err))
os.Exit(1)
}

// subscribe to topic simple-stream which keeps sending a random word at an interval
sub, err := brokerConn.Subscribe("/topic/simple-stream")
if err != nil {
fmt.Println(fmt.Errorf("subscription failed: %w", err))
os.Exit(1)
}

// let's disconnect after 10 seconds for the sake of an example
disconnectInTime := 10 * time.Second
tenSecTimer := time.NewTimer(disconnectInTime)

go func() {
for {
select {
case msg := <-sub.GetMsgChannel():
// extract payload (of string type) from the Message object
var payload string
if err := msg.CastPayloadToType(&payload); err != nil {
fmt.Printf("failed to cast payload: %s\n", err.Error())
continue
}
fmt.Printf("msg body: %s\n", payload)
break
case <-tenSecTimer.C:
fmt.Println(disconnectInTime.String() + " elapsed. Disconnecting")
tenSecTimer.Stop()
c <- syscall.SIGINT
break
}
}
}()

<-c
}

// connectToBroker wraps the connection logic to the broker and returns the bridge connection object and an error
func connectToBroker() (bridge.Connection, error) {
b := bus.GetBus()
brokerConfig := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: ":61613",
HeartBeatOut: 30 * time.Second,
}

brokerConn, err := b.ConnectBroker(brokerConfig)
if err != nil {
return nil, err
}

fmt.Println("broker connected. broker ID:", brokerConn.GetId())
return brokerConn, nil
}
2 changes: 2 additions & 0 deletions plank/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
"rest_bridge_timeout_in_minutes": 1,
"fabric_config": {
"fabric_endpoint": "/ws",
"use_tcp": false,
"tcp_port": 61613,
"endpoint_config": {
"TopicPrefix": "/topic",
"UserQueuePrefix": "/queue",
Expand Down
4 changes: 4 additions & 0 deletions plank/pkg/server/banner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (ps *platformServer) printBanner() {

if ps.serverConfig.FabricConfig != nil {
utils.InfoFprintf(ps.out, "Fabric endpoint\t\t")
if ps.serverConfig.FabricConfig.UseTCP {
_, _ = fmt.Fprintln(ps.out, fmt.Sprintf(":%d (TCP)", ps.serverConfig.FabricConfig.TCPPort))
return
}
_, _ = fmt.Fprintln(ps.out, ps.serverConfig.FabricConfig.FabricEndpoint)
}

Expand Down
70 changes: 70 additions & 0 deletions plank/pkg/server/banner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,43 @@ func TestPrintBanner_FabricConfig(t *testing.T) {
assert.Contains(t, string(logContents), "Fabric endpoint\t\t/ws")
}

func TestPrintBanner_FabricConfig_TCP(t *testing.T) {
testRoot := filepath.Join(os.TempDir(), "plank-tests")
_ = os.MkdirAll(testRoot, 0755)
testLogFile := filepath.Join(testRoot, "testlog.log")
defer os.RemoveAll(testRoot)

cfg := GetBasicTestServerConfig(testRoot, testLogFile, testLogFile, testLogFile, 9981, false)
cfg.FabricConfig = &FabricBrokerConfig{
FabricEndpoint: "/ws",
UseTCP: true,
TCPPort: 61613,
EndpointConfig: &bus.EndpointConfig{
TopicPrefix: "/topic",
UserQueuePrefix: "/queue",
AppRequestPrefix: "/pub/topic",
AppRequestQueuePrefix: "/pub/queue",
Heartbeat: 30000,
},
}
_, _, testServerInterface := CreateTestServer(cfg)
testServer := testServerInterface.(*platformServer)

// act
testServer.printBanner()

// assert
logContents, err := ioutil.ReadFile(testLogFile)
if err != nil {
assert.Fail(t, err.Error())
}

assert.FileExists(t, testLogFile)
assert.Contains(t, string(logContents), "Host\t\t\tlocalhost")
assert.Contains(t, string(logContents), "Port\t\t\t9981")
assert.Contains(t, string(logContents), "Fabric endpoint\t\t:61613 (TCP)")
}

func TestPrintBanner_SpaConfig(t *testing.T) {
testRoot := filepath.Join(os.TempDir(), "plank-tests")
_ = os.MkdirAll(testRoot, 0755)
Expand Down Expand Up @@ -141,6 +178,39 @@ func TestPrintBanner_SpaConfig(t *testing.T) {
assert.Contains(t, string(logContents), "SPA static assets\t/a, /b, /public/c")
}

func TestPrintBanner_SpaConfig_NoStaticAssets(t *testing.T) {
testRoot := filepath.Join(os.TempDir(), "plank-tests")
_ = os.MkdirAll(testRoot, 0755)
testLogFile := filepath.Join(testRoot, "testlog.log")
defer os.RemoveAll(testRoot)

cfg := GetBasicTestServerConfig(testRoot, testLogFile, testLogFile, testLogFile, 9981, false)
cfg.SpaConfig = &SpaConfig{
RootFolder: testRoot,
BaseUri: "/",
StaticAssets: make([]string, 0),
CacheControlRules: nil,
}

_, _, testServerInterface := CreateTestServer(cfg)
testServer := testServerInterface.(*platformServer)

// act
testServer.printBanner()

// assert
logContents, err := ioutil.ReadFile(testLogFile)
if err != nil {
assert.Fail(t, err.Error())
}

assert.FileExists(t, testLogFile)
assert.Contains(t, string(logContents), "Host\t\t\tlocalhost")
assert.Contains(t, string(logContents), "Port\t\t\t9981")
assert.Contains(t, string(logContents), "SPA endpoint\t\t/")
assert.Contains(t, string(logContents), "SPA static assets\t-")
}

func TestPrintBanner_Prometheus(t *testing.T) {
testRoot := filepath.Join(os.TempDir(), "plank-tests")
_ = os.MkdirAll(testRoot, 0755)
Expand Down
36 changes: 36 additions & 0 deletions plank/pkg/server/base_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package server

import "fmt"

var (
errServerInit = &baseError{message: "Server initialization failed"}
errHttp = &baseError{message: "HTTP error"}
errInternal = &baseError{message: "Internal error"}
errUndefined = &baseError{message: "Undefined error"}
)

type baseError struct {
wrappedErr error
baseErr *baseError
message string
}

func (e baseError) Is(err error) bool {
return e.baseErr == err
}

func (e baseError) Error() string {
return fmt.Sprintf("[plank] Error: %s: %s\n", e.baseErr.message, e.wrappedErr.Error())
}

func wrapError(baseType error, err error) error {
switch baseType {
case errServerInit:
return &baseError{baseErr: errServerInit, wrappedErr: err}
case errInternal:
return &baseError{baseErr: errInternal, wrappedErr: err}
case errHttp:
return &baseError{baseErr: errHttp, wrappedErr: err}
}
return &baseError{baseErr: errUndefined, wrappedErr: err}
}
47 changes: 47 additions & 0 deletions plank/pkg/server/base_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package server

import (
"errors"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBaseError_Is_errServerInit(t *testing.T) {
e := wrapError(errServerInit, errors.New("some init fail"))
assert.True(t, errors.Is(e, errServerInit))
}

func TestBaseError_Error_errServerInit(t *testing.T) {
e := wrapError(errServerInit, errors.New("some init fail"))
assert.EqualValues(t, "[plank] Error: Server initialization failed: some init fail\n", e.Error())
}

func TestBaseError_Is_errInternal(t *testing.T) {
e := wrapError(errInternal, errors.New("internal server error"))
assert.True(t, errors.Is(e, errInternal))
}

func TestBaseError_Error_errInternal(t *testing.T) {
e := wrapError(errInternal, errors.New("internal server error"))
assert.EqualValues(t, "[plank] Error: Internal error: internal server error\n", e.Error())
}

func TestBaseError_Is_errHttp(t *testing.T) {
e := wrapError(errHttp, errors.New("404"))
assert.True(t, errors.Is(e, errHttp))
}

func TestBaseError_Error_errHttp(t *testing.T) {
e := wrapError(errHttp, errors.New("404"))
assert.EqualValues(t, "[plank] Error: HTTP error: 404\n", e.Error())
}

func TestBaseError_Is_undefined(t *testing.T) {
e := wrapError(errors.New("some random stuff"), errors.New("?"))
assert.True(t, errors.Is(e, errUndefined))
}

func TestBaseError_Error_undefined(t *testing.T) {
e := wrapError(errors.New("some random stuff"), errors.New("?"))
assert.EqualValues(t, "[plank] Error: Undefined error: ?\n", e.Error())
}
2 changes: 2 additions & 0 deletions plank/pkg/server/core_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type TLSCertConfig struct {
// FabricBrokerConfig defines the endpoint for WebSocket as well as detailed endpoint configuration
type FabricBrokerConfig struct {
FabricEndpoint string `json:"fabric_endpoint"` // URI to WebSocket endpoint
UseTCP bool `json:"use_tcp"` // Use TCP instead of WebSocket
TCPPort int `json:"tcp_port"` // TCP port to use if UseTCP is true
EndpointConfig *bus.EndpointConfig `json:"endpoint_config"` // STOMP configuration
}

Expand Down
15 changes: 9 additions & 6 deletions plank/pkg/server/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,15 @@ func (ps *platformServer) configureFabric() {
}

var err error
// TODO: consider tightening access by allowing configuring allowedOrigins
ps.fabricConn, err = stompserver.NewWebSocketConnectionFromExistingHttpServer(
ps.HttpServer,
ps.router,
ps.serverConfig.FabricConfig.FabricEndpoint,
nil)
if ps.serverConfig.FabricConfig.UseTCP {
ps.fabricConn, err = stompserver.NewTcpConnectionListener(fmt.Sprintf(":%d", ps.serverConfig.FabricConfig.TCPPort))
} else {
ps.fabricConn, err = stompserver.NewWebSocketConnectionFromExistingHttpServer(
ps.HttpServer,
ps.router,
ps.serverConfig.FabricConfig.FabricEndpoint,
nil) // TODO: consider tightening access by allowing configuring allowedOrigins
}

// if creation of listener fails, crash and burn
if err != nil {
Expand Down
35 changes: 26 additions & 9 deletions plank/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/vmware/transport-go/model"
"io/ioutil"
"net"
"net/http"
Expand All @@ -24,6 +21,10 @@ import (
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/vmware/transport-go/model"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/vmware/transport-go/bus"
Expand Down Expand Up @@ -156,21 +157,37 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) {
ps.ServerAvailability.Http = true
if ps.serverConfig.TLSCertConfig != nil {
utils.Log.Infof("[plank] Starting HTTP server at %s:%d with TLS", ps.serverConfig.Host, ps.serverConfig.Port)
_ = ps.HttpServer.ListenAndServeTLS(ps.serverConfig.TLSCertConfig.CertFile, ps.serverConfig.TLSCertConfig.KeyFile)
if err := ps.HttpServer.ListenAndServeTLS(ps.serverConfig.TLSCertConfig.CertFile, ps.serverConfig.TLSCertConfig.KeyFile); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalln(wrapError(errServerInit, err))
}
}
} else {
utils.Log.Infof("[plank] Starting HTTP server at %s:%d", ps.serverConfig.Host, ps.serverConfig.Port)
_ = ps.HttpServer.ListenAndServe()
if err := ps.HttpServer.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalln(wrapError(errServerInit, err))
}
}
}
}()

// if Fabric broker configuration is found, start the broker
if ps.serverConfig.FabricConfig != nil {
go func() {
utils.Log.Infof("[plank] Starting Transport broker at %s:%d%s",
ps.serverConfig.Host, ps.serverConfig.Port, ps.serverConfig.FabricConfig.FabricEndpoint)
fabricPort := ps.serverConfig.Port
fabricEndpoint := ps.serverConfig.FabricConfig.FabricEndpoint
if ps.serverConfig.FabricConfig.UseTCP {
// if using TCP adjust port accordingly and drop endpoint
fabricPort = ps.serverConfig.FabricConfig.TCPPort
fabricEndpoint = ""
}
brokerLocation := fmt.Sprintf("%s:%d%s", ps.serverConfig.Host, fabricPort, fabricEndpoint)
utils.Log.Infof("[plank] Starting Transport broker at %s", brokerLocation)
ps.ServerAvailability.Fabric = true

if err := ps.eventbus.StartFabricEndpoint(ps.fabricConn, *ps.serverConfig.FabricConfig.EndpointConfig); err != nil {
panic(err)
utils.Log.Fatalln(wrapError(errServerInit, err))
}
}()
}
Expand Down Expand Up @@ -246,7 +263,7 @@ func (ps *platformServer) StopServer() {
}

if ps.fabricConn != nil {
err = ps.fabricConn.Close()
err = ps.eventbus.StopFabricEndpoint()
if err != nil {
utils.Log.Errorln(err)
}
Expand Down
Loading

0 comments on commit 553eea5

Please sign in to comment.