diff --git a/README.md b/README.md index ab77f86..d81f5f2 100644 --- a/README.md +++ b/README.md @@ -132,96 +132,105 @@ Ping: Woo! Pong: Woo! ``` -## Example connecting to message broker and using galactic channels +## Example connecting to a message broker and using galactic channels -If you would like to connect the bus to a broker and start streaming stuff, it's quite simple. Here is an example -that connects to `appfabric.vmware.com` and starts streaming over a local channel that is mapped to the live -sample service that it broadcasting every few hundred milliseconds on `/topic/simple-stream` +If you would like to connect the bus to a broker and start streaming stuff, you can run the local demo broker +by first building using `./build-transport.sh` and then starting the local broker (and a bunch of demo services) via ` +./transport-go service` + +Once running, this example will connect to the broker and starts streaming over a local channel that is mapped to the live +sample service that is broadcasting every few hundred milliseconds on `/topic/simple-stream` ```go +package main + import ( - "bifrost/bridge" - "bifrost/bus" - "bifrost/model" - "encoding/json" - "fmt" - "log" + "encoding/json" + "fmt" + "github.com/vmware/transport-go/bridge" + "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/model" + "log" ) +func main() { + usingGalacticChannels() +} + func usingGalacticChannels() { - // get a pointer to the bus. - b := bus.GetBus() - - // get a pointer to the channel manager - cm := b.GetChannelManager() - - channel := "my-stream" - cm.CreateChannel(channel) - - // create done signal - var done = make(chan bool) - - // listen to stream of messages coming in on channel. - h, err := b.ListenStream(channel) - - if err != nil { - log.Panicf("unable to listen to channel stream, error: %e", err) - } - - count := 0 - - // listen for five messages and then exit, send a completed signal on channel. - h.Handle( - func(msg *model.Message) { - - // unmarshal the payload into a Response object (used by fabric services) - r := &model.Response{} - d := msg.Payload.([]byte) - json.Unmarshal(d, &r) - fmt.Printf("Stream Ticked: %s\n", r.Payload.(string)) - count++ - if count >=5 { - done <- true - } - }, - func(err error) { - log.Panicf("error received on channel %e", err) - }) - - // create a broker connector config, in this case, we will connect to the application fabric demo endpoint. - config := &bridge.BrokerConnectorConfig{ - Username: "guest", - Password: "guest", - ServerAddr: "appfabric.vmware.com", - WSPath: "/fabric", - UseWS: true} - - // connect to broker. - c, err := b.ConnectBroker(config) - if err != nil { - log.Panicf("unable to connect to fabric, error: %e", err) - } - - // mark our local channel as galactic and map it to our connection and the /topic/simple-stream service - // running on appfabric.vmware.com - err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c) - if err != nil { - log.Panicf("unable to map local channel to broker destination: %e", err) - } - - // wait for done signal - <-done - - // mark channel as local (unsubscribe from all mappings) - err = cm.MarkChannelAsLocal(channel) - if err != nil { - log.Panicf("unable to unsubscribe, error: %e", err) - } - err = c.Disconnect() - if err != nil { - log.Panicf("unable to disconnect, error: %e", err) - } + // get a pointer to the bus. + b := bus.GetBus() + + // get a pointer to the channel manager + cm := b.GetChannelManager() + + channel := "my-stream" + cm.CreateChannel(channel) + + // create done signal + var done = make(chan bool) + + // listen to stream of messages coming in on channel. + h, err := b.ListenStream(channel) + + if err != nil { + log.Panicf("unable to listen to channel stream, error: %e", err) + } + + count := 0 + + // listen for ten messages and then exit, send a completed signal on channel. + h.Handle( + func(msg *model.Message) { + + // unmarshal the payload into a Response object (used by fabric services) + r := &model.Response{} + d := msg.Payload.([]byte) + json.Unmarshal(d, &r) + fmt.Printf("Stream Ticked: %s\n", r.Payload.(string)) + count++ + if count >=10 { + done <- true + } + }, + func(err error) { + log.Panicf("error received on channel %e", err) + }) + + // create a broker connector config, in this case, we will connect to the demo endpoint. + config := &bridge.BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + ServerAddr: "localhost:8090", + WSPath: "/fabric", + UseWS: true} + + // connect to broker. + c, err := b.ConnectBroker(config) + if err != nil { + log.Panicf("unable to connect to fabric, error: %e", err) + } + + // mark our local channel as galactic and map it to our connection and the /topic/simple-stream service + // running on localhost:8090 + err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c) + if err != nil { + log.Panicf("unable to map local channel to broker destination: %e", err) + } + + // wait for done signal + <-done + + // mark channel as local (unsubscribe from all mappings) + err = cm.MarkChannelAsLocal(channel) + if err != nil { + log.Panicf("unable to unsubscribe, error: %e", err) + } + err = c.Disconnect() + if err != nil { + log.Panicf("unable to disconnect, error: %e", err) + } } ``` diff --git a/service/fabric_core.go b/service/fabric_core.go index 884ad40..8329b8f 100644 --- a/service/fabric_core.go +++ b/service/fabric_core.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 VMware, Inc. +// Copyright 2019-2021 VMware, Inc. // SPDX-License-Identifier: BSD-2-Clause package service @@ -10,31 +10,44 @@ import ( "github.com/vmware/transport-go/model" ) -// Interface providing base functionality to fabric services. +// FabricServiceCore is the interface providing base functionality to fabric services. type FabricServiceCore interface { - // Returns the EventBus instance. + // Bus Returns the EventBus instance. Bus() bus.EventBus - // Uses the "responsePayload" and "request" params to build and send model.Response object + + // SendResponse Uses the "responsePayload" and "request" params to build and send model.Response object // on the service channel. SendResponse(request *model.Request, responsePayload interface{}) - // Same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be + + // SendResponseWithHeaders is the same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be // set as HTTP response headers. Great for custom mime-types, binary stuff and more. SendResponseWithHeaders(request *model.Request, responsePayload interface{}, headers map[string]string) - // Builds an error model.Response object and sends it on the service channel as - // response to the "request" param. + + // SendErrorResponse builds an error model.Response object and sends it on the service channel as response to the "request" param. SendErrorResponse(request *model.Request, responseErrorCode int, responseErrorMessage string) + + // SendErrorResponseWithPayload is the same as SendErrorResponse, but adds a payload SendErrorResponseWithPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{}) - // Handles unknown/unsupported request. + + // SendErrorResponseWithHeaders is the same as SendErrorResponse, but adds headers as well. + SendErrorResponseWithHeaders(request *model.Request, responseErrorCode int, responseErrorMessage string, headers map[string]string) + + // SendErrorResponseWithHeadersAndPayload is the same as SendErrorResponseWithPayload, but adds headers as well. + SendErrorResponseWithHeadersAndPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string) + + // HandleUnknownRequest handles unknown/unsupported/un-implemented requests, HandleUnknownRequest(request *model.Request) - // Make a new RestService call. + + // RestServiceRequest will make a new RestService call. RestServiceRequest(restRequest *RestServiceRequest, successHandler model.ResponseHandlerFunction, errorHandler model.ResponseHandlerFunction) - // Set global headers for a given fabric service (each service has its own set of global headers). + + // SetHeaders Set global headers for a given fabric service (each service has its own set of global headers). // The headers will be applied to all requests made by this instance's RestServiceRequest method. // Global header values can be overridden per request via the RestServiceRequest.Headers property. SetHeaders(headers map[string]string) - // Automatically ready to go map with json headers. + // GenerateJSONHeaders Automatically ready to go map with json headers. GenerateJSONHeaders() map[string]string } @@ -90,6 +103,39 @@ func (core *fabricCore) SendErrorResponseWithPayload( core.bus.SendResponseMessage(core.channelName, response, request.Id) } +func (core *fabricCore) SendErrorResponseWithHeaders( + request *model.Request, + responseErrorCode int, responseErrorMessage string, headers map[string]string) { + + response := &model.Response{ + Id: request.Id, + Destination: core.channelName, + Headers: headers, + Error: true, + ErrorCode: responseErrorCode, + ErrorMessage: responseErrorMessage, + BrokerDestination: request.BrokerDestination, + } + core.bus.SendResponseMessage(core.channelName, response, request.Id) +} + +func (core *fabricCore) SendErrorResponseWithHeadersAndPayload( + request *model.Request, + responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string) { + + response := &model.Response{ + Id: request.Id, + Destination: core.channelName, + Payload: payload, + Headers: headers, + Error: true, + ErrorCode: responseErrorCode, + ErrorMessage: responseErrorMessage, + BrokerDestination: request.BrokerDestination, + } + core.bus.SendResponseMessage(core.channelName, response, request.Id) +} + func (core *fabricCore) HandleUnknownRequest(request *model.Request) { errorMsg := fmt.Sprintf("unsupported request for \"%s\": %s", core.channelName, request.Request) core.SendErrorResponse(request, 403, errorMsg) diff --git a/service/fabric_core_test.go b/service/fabric_core_test.go index 214ec38..505e6c2 100644 --- a/service/fabric_core_test.go +++ b/service/fabric_core_test.go @@ -97,12 +97,46 @@ func TestFabricCore_SendMethods(t *testing.T) { assert.Equal(t, response.ErrorMessage, "test-error") wg.Add(1) - core.HandleUnknownRequest(&req) + + h = make(map[string]string) + h["chicken"] = "nugget" + core.SendErrorResponseWithHeaders(&req, 422, "test-header-error", h) wg.Wait() assert.Equal(t, count, 4) response = lastMessage.Payload.(*model.Response) + assert.Equal(t, response.Id, req.Id) + assert.Equal(t, response.Headers["chicken"], "nugget") + assert.Nil(t, response.Payload) + assert.True(t, response.Error) + assert.Equal(t, response.ErrorCode, 422) + assert.Equal(t, response.ErrorMessage, "test-header-error") + + wg.Add(1) + + h = make(map[string]string) + h["potato"] = "dog" + core.SendErrorResponseWithHeadersAndPayload(&req, 500, "test-header-payload-error", "oh my!", h) + wg.Wait() + + assert.Equal(t, count, 5) + response = lastMessage.Payload.(*model.Response) + + assert.Equal(t, response.Id, req.Id) + assert.Equal(t, "dog", response.Headers["potato"]) + assert.Equal(t, "oh my!", response.Payload.(string)) + assert.True(t, response.Error) + assert.Equal(t, response.ErrorCode, 500) + assert.Equal(t, response.ErrorMessage, "test-header-payload-error") + + wg.Add(1) + core.HandleUnknownRequest(&req) + wg.Wait() + + assert.Equal(t, count, 6) + response = lastMessage.Payload.(*model.Response) + assert.Equal(t, response.Id, req.Id) assert.True(t, response.Error) assert.Equal(t, 403, response.ErrorCode) diff --git a/service/service_registry.go b/service/service_registry.go index 2f4b0ac..e6e5834 100644 --- a/service/service_registry.go +++ b/service/service_registry.go @@ -1,153 +1,156 @@ -// Copyright 2019-2020 VMware, Inc. +// Copyright 2019-2021 VMware, Inc. // SPDX-License-Identifier: BSD-2-Clause package service import ( - "github.com/vmware/transport-go/bus" - "sync" - "fmt" - "github.com/vmware/transport-go/model" - "log" + "fmt" + "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/model" + "log" + "sync" ) -// Registry with all local fabric services. +// ServiceRegistry is the registry for all local fabric services. type ServiceRegistry interface { - // Registers a new fabric service and associates it with a given EventBus channel. - // Only one fabric service can be associated with a given channel. - // If the fabric service implements the FabricInitializableService interface - // its Init method will be called during the registration process. - RegisterService(service FabricService, serviceChannelName string) error - // Unregisters the fabric service associated with the given channel. - UnregisterService(serviceChannelName string) error - // Set global base host or host:port to be used by the restService - SetGlobalRestServiceBaseHost(host string) + + // RegisterService registers a new fabric service and associates it with a given EventBus channel. + // Only one fabric service can be associated with a given channel. + // If the fabric service implements the FabricInitializableService interface + // its Init method will be called during the registration process. + RegisterService(service FabricService, serviceChannelName string) error + + // UnregisterService unregisters the fabric service associated with the given channel. + UnregisterService(serviceChannelName string) error + + // SetGlobalRestServiceBaseHost sets the global base host or host:port to be used by the restService + SetGlobalRestServiceBaseHost(host string) } type serviceRegistry struct { - lock sync.Mutex - services map[string]*fabricServiceWrapper - bus bus.EventBus + lock sync.Mutex + services map[string]*fabricServiceWrapper + bus bus.EventBus } var once sync.Once var registry ServiceRegistry func GetServiceRegistry() ServiceRegistry { - once.Do(func() { - registry = NewServiceRegistry(bus.GetBus()) - }) - return registry + once.Do(func() { + registry = NewServiceRegistry(bus.GetBus()) + }) + return registry } func NewServiceRegistry(bus bus.EventBus) ServiceRegistry { - registry := &serviceRegistry{ - bus: bus, - services: make(map[string]*fabricServiceWrapper), - } - // auto-register the restService - registry.RegisterService(&restService{}, restServiceChannel) - return registry + registry := &serviceRegistry{ + bus: bus, + services: make(map[string]*fabricServiceWrapper), + } + // auto-register the restService + registry.RegisterService(&restService{}, restServiceChannel) + return registry } func (r *serviceRegistry) SetGlobalRestServiceBaseHost(host string) { - r.services[restServiceChannel].service.(*restService).setBaseHost(host) + r.services[restServiceChannel].service.(*restService).setBaseHost(host) } func (r *serviceRegistry) RegisterService(service FabricService, serviceChannelName string) error { - r.lock.Lock() - defer r.lock.Unlock() + r.lock.Lock() + defer r.lock.Unlock() - if service == nil { - return fmt.Errorf("unable to register service: nil service") - } + if service == nil { + return fmt.Errorf("unable to register service: nil service") + } - if _, ok := r.services[serviceChannelName]; ok { - return fmt.Errorf("unable to register service: service channel name is already used: %s", serviceChannelName) - } + if _, ok := r.services[serviceChannelName]; ok { + return fmt.Errorf("unable to register service: service channel name is already used: %s", serviceChannelName) + } - sw := newServiceWrapper(r.bus, service, serviceChannelName) - err := sw.init() - if err != nil { - return err - } + sw := newServiceWrapper(r.bus, service, serviceChannelName) + err := sw.init() + if err != nil { + return err + } - r.services[serviceChannelName] = sw - return nil + r.services[serviceChannelName] = sw + return nil } func (r *serviceRegistry) UnregisterService(serviceChannelName string) error { - r.lock.Lock() - defer r.lock.Unlock() - sw, ok := r.services[serviceChannelName] - if !ok { - return fmt.Errorf("unable to unregister service: no service is registered for channel \"%s\"", serviceChannelName) - } - sw.unregister() - delete(r.services, serviceChannelName) - return nil + r.lock.Lock() + defer r.lock.Unlock() + sw, ok := r.services[serviceChannelName] + if !ok { + return fmt.Errorf("unable to unregister service: no service is registered for channel \"%s\"", serviceChannelName) + } + sw.unregister() + delete(r.services, serviceChannelName) + return nil } type fabricServiceWrapper struct { - service FabricService - fabricCore *fabricCore - requestMsgHandler bus.MessageHandler + service FabricService + fabricCore *fabricCore + requestMsgHandler bus.MessageHandler } func newServiceWrapper( - bus bus.EventBus, service FabricService, serviceChannelName string) *fabricServiceWrapper { - - return &fabricServiceWrapper{ - service: service, - fabricCore: &fabricCore{ - bus: bus, - channelName: serviceChannelName, - }, - } + bus bus.EventBus, service FabricService, serviceChannelName string) *fabricServiceWrapper { + + return &fabricServiceWrapper{ + service: service, + fabricCore: &fabricCore{ + bus: bus, + channelName: serviceChannelName, + }, + } } func (sw *fabricServiceWrapper) init() error { - sw.fabricCore.bus.GetChannelManager().CreateChannel(sw.fabricCore.channelName) - - initializationService, ok := sw.service.(FabricInitializableService) - if ok { - initializationErr := initializationService.Init(sw.fabricCore) - if initializationErr != nil { - return initializationErr - } - } - - mh, err := sw.fabricCore.bus.ListenRequestStream(sw.fabricCore.channelName) - if err != nil { - return err - } - - sw.requestMsgHandler = mh - mh.Handle( - func(message *model.Message) { - requestPtr, ok := message.Payload.(*model.Request) - if !ok { - request, ok := message.Payload.(model.Request) - if !ok { - log.Println("cannot cast service request payload to model.Request") - return - } - requestPtr = &request - } - - if message.DestinationId != nil { - requestPtr.Id = message.DestinationId - } - - sw.service.HandleServiceRequest(requestPtr, sw.fabricCore) - }, - func(e error) {}) - - return nil + sw.fabricCore.bus.GetChannelManager().CreateChannel(sw.fabricCore.channelName) + + initializationService, ok := sw.service.(FabricInitializableService) + if ok { + initializationErr := initializationService.Init(sw.fabricCore) + if initializationErr != nil { + return initializationErr + } + } + + mh, err := sw.fabricCore.bus.ListenRequestStream(sw.fabricCore.channelName) + if err != nil { + return err + } + + sw.requestMsgHandler = mh + mh.Handle( + func(message *model.Message) { + requestPtr, ok := message.Payload.(*model.Request) + if !ok { + request, ok := message.Payload.(model.Request) + if !ok { + log.Println("cannot cast service request payload to model.Request") + return + } + requestPtr = &request + } + + if message.DestinationId != nil { + requestPtr.Id = message.DestinationId + } + + sw.service.HandleServiceRequest(requestPtr, sw.fabricCore) + }, + func(e error) {}) + + return nil } func (sw *fabricServiceWrapper) unregister() { - if sw.requestMsgHandler != nil { - sw.requestMsgHandler.Close() - } + if sw.requestMsgHandler != nil { + sw.requestMsgHandler.Close() + } } diff --git a/transport.go b/transport.go index 668c583..2d3d233 100644 --- a/transport.go +++ b/transport.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 VMware, Inc. +// Copyright 2019-2021 VMware, Inc. // SPDX-License-Identifier: BSD-2-Clause package main @@ -45,28 +45,28 @@ func main() { return nil }, }, - { - Name: "cal", - Usage: "Call Calendar service for the time on appfabric.vmware.com", - Action: func(c *cli.Context) error { - runDemoCal() - return nil - }, - }, - { - Name: "vm-service", - Usage: "Call VmService to create and Power on a new VM on appfabric.vmware.com", - Action: func(c *cli.Context) error { - runDemoVmService(c) - return nil - }, - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "localhost", - Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", - }, - }, - }, + //{ + // Name: "cal", + // Usage: "Call Calendar service for the time on appfabric.vmware.com", + // Action: func(c *cli.Context) error { + // runDemoCal() + // return nil + // }, + //}, + //{ + // Name: "vm-service", + // Usage: "Call VmService to create and Power on a new VM on appfabric.vmware.com", + // Action: func(c *cli.Context) error { + // runDemoVmService(c) + // return nil + // }, + // Flags: []cli.Flag{ + // &cli.BoolFlag{ + // Name: "localhost", + // Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", + // }, + // }, + //}, { Name: "service", Usage: "Run Service - Run local service", @@ -81,27 +81,27 @@ func main() { return nil }, }, - { - Name: "store", - Usage: "Open galactic store from appfabric.vmware.com", - Action: func(c *cli.Context) error { - runDemoStore(c) - return nil - }, - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "localhost", - Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", - }, - }, - }, + //{ + // Name: "store", + // Usage: "Open galactic store from appfabric.vmware.com", + // Action: func(c *cli.Context) error { + // runDemoStore(c) + // return nil + // }, + // Flags: []cli.Flag{ + // &cli.BoolFlag{ + // Name: "localhost", + // Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", + // }, + // }, + //}, { Name: "fabric-services", Usage: "Starts a couple of demo fabric services locally", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "localhost", - Usage: "Use localhost Bifrost broker", + Usage: "Use localhost transport broker", }, }, Action: func(c *cli.Context) error {