From 37278cf0299d32779b6b16afc91107e85f2a56cd Mon Sep 17 00:00:00 2001 From: Dave Shanley Date: Thu, 6 May 2021 21:12:53 -0400 Subject: [PATCH] Added a couple of new convenience methods to core, cleaned up some docs and formatting. (#10) * Added a couple of new convenience methods to core core fabric methods for errors added to allow payload and headers to be submitted. Also cleaned up some of the godoc in the service package. Signed-off-by: Dave Shanley * Disabled some of the demo functions Cleaner documentation and demo code is required to make these useful. demo needs to ne converted to connecting to local broker instead of cloud. Signed-off-by: Dave Shanley * Updated README to include better instructions. More examples and samples are required, however the galactic channels sample code has been updated to work against the local broker and not a non-public endpoint. Signed-off-by: Dave Shanley --- README.md | 173 ++++++++++++++-------------- service/fabric_core.go | 68 +++++++++-- service/fabric_core_test.go | 36 +++++- service/service_registry.go | 217 ++++++++++++++++++------------------ transport.go | 76 ++++++------- 5 files changed, 331 insertions(+), 239 deletions(-) diff --git a/README.md b/README.md index ab77f86..d81f5f2 100644 --- a/README.md +++ b/README.md @@ -132,96 +132,105 @@ Ping: Woo! Pong: Woo! ``` -## Example connecting to message broker and using galactic channels +## Example connecting to a message broker and using galactic channels -If you would like to connect the bus to a broker and start streaming stuff, it's quite simple. Here is an example -that connects to `appfabric.vmware.com` and starts streaming over a local channel that is mapped to the live -sample service that it broadcasting every few hundred milliseconds on `/topic/simple-stream` +If you would like to connect the bus to a broker and start streaming stuff, you can run the local demo broker +by first building using `./build-transport.sh` and then starting the local broker (and a bunch of demo services) via ` +./transport-go service` + +Once running, this example will connect to the broker and starts streaming over a local channel that is mapped to the live +sample service that is broadcasting every few hundred milliseconds on `/topic/simple-stream` ```go +package main + import ( - "bifrost/bridge" - "bifrost/bus" - "bifrost/model" - "encoding/json" - "fmt" - "log" + "encoding/json" + "fmt" + "github.com/vmware/transport-go/bridge" + "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/model" + "log" ) +func main() { + usingGalacticChannels() +} + func usingGalacticChannels() { - // get a pointer to the bus. - b := bus.GetBus() - - // get a pointer to the channel manager - cm := b.GetChannelManager() - - channel := "my-stream" - cm.CreateChannel(channel) - - // create done signal - var done = make(chan bool) - - // listen to stream of messages coming in on channel. - h, err := b.ListenStream(channel) - - if err != nil { - log.Panicf("unable to listen to channel stream, error: %e", err) - } - - count := 0 - - // listen for five messages and then exit, send a completed signal on channel. - h.Handle( - func(msg *model.Message) { - - // unmarshal the payload into a Response object (used by fabric services) - r := &model.Response{} - d := msg.Payload.([]byte) - json.Unmarshal(d, &r) - fmt.Printf("Stream Ticked: %s\n", r.Payload.(string)) - count++ - if count >=5 { - done <- true - } - }, - func(err error) { - log.Panicf("error received on channel %e", err) - }) - - // create a broker connector config, in this case, we will connect to the application fabric demo endpoint. - config := &bridge.BrokerConnectorConfig{ - Username: "guest", - Password: "guest", - ServerAddr: "appfabric.vmware.com", - WSPath: "/fabric", - UseWS: true} - - // connect to broker. - c, err := b.ConnectBroker(config) - if err != nil { - log.Panicf("unable to connect to fabric, error: %e", err) - } - - // mark our local channel as galactic and map it to our connection and the /topic/simple-stream service - // running on appfabric.vmware.com - err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c) - if err != nil { - log.Panicf("unable to map local channel to broker destination: %e", err) - } - - // wait for done signal - <-done - - // mark channel as local (unsubscribe from all mappings) - err = cm.MarkChannelAsLocal(channel) - if err != nil { - log.Panicf("unable to unsubscribe, error: %e", err) - } - err = c.Disconnect() - if err != nil { - log.Panicf("unable to disconnect, error: %e", err) - } + // get a pointer to the bus. + b := bus.GetBus() + + // get a pointer to the channel manager + cm := b.GetChannelManager() + + channel := "my-stream" + cm.CreateChannel(channel) + + // create done signal + var done = make(chan bool) + + // listen to stream of messages coming in on channel. + h, err := b.ListenStream(channel) + + if err != nil { + log.Panicf("unable to listen to channel stream, error: %e", err) + } + + count := 0 + + // listen for ten messages and then exit, send a completed signal on channel. + h.Handle( + func(msg *model.Message) { + + // unmarshal the payload into a Response object (used by fabric services) + r := &model.Response{} + d := msg.Payload.([]byte) + json.Unmarshal(d, &r) + fmt.Printf("Stream Ticked: %s\n", r.Payload.(string)) + count++ + if count >=10 { + done <- true + } + }, + func(err error) { + log.Panicf("error received on channel %e", err) + }) + + // create a broker connector config, in this case, we will connect to the demo endpoint. + config := &bridge.BrokerConnectorConfig{ + Username: "guest", + Password: "guest", + ServerAddr: "localhost:8090", + WSPath: "/fabric", + UseWS: true} + + // connect to broker. + c, err := b.ConnectBroker(config) + if err != nil { + log.Panicf("unable to connect to fabric, error: %e", err) + } + + // mark our local channel as galactic and map it to our connection and the /topic/simple-stream service + // running on localhost:8090 + err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c) + if err != nil { + log.Panicf("unable to map local channel to broker destination: %e", err) + } + + // wait for done signal + <-done + + // mark channel as local (unsubscribe from all mappings) + err = cm.MarkChannelAsLocal(channel) + if err != nil { + log.Panicf("unable to unsubscribe, error: %e", err) + } + err = c.Disconnect() + if err != nil { + log.Panicf("unable to disconnect, error: %e", err) + } } ``` diff --git a/service/fabric_core.go b/service/fabric_core.go index 884ad40..8329b8f 100644 --- a/service/fabric_core.go +++ b/service/fabric_core.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 VMware, Inc. +// Copyright 2019-2021 VMware, Inc. // SPDX-License-Identifier: BSD-2-Clause package service @@ -10,31 +10,44 @@ import ( "github.com/vmware/transport-go/model" ) -// Interface providing base functionality to fabric services. +// FabricServiceCore is the interface providing base functionality to fabric services. type FabricServiceCore interface { - // Returns the EventBus instance. + // Bus Returns the EventBus instance. Bus() bus.EventBus - // Uses the "responsePayload" and "request" params to build and send model.Response object + + // SendResponse Uses the "responsePayload" and "request" params to build and send model.Response object // on the service channel. SendResponse(request *model.Request, responsePayload interface{}) - // Same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be + + // SendResponseWithHeaders is the same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be // set as HTTP response headers. Great for custom mime-types, binary stuff and more. SendResponseWithHeaders(request *model.Request, responsePayload interface{}, headers map[string]string) - // Builds an error model.Response object and sends it on the service channel as - // response to the "request" param. + + // SendErrorResponse builds an error model.Response object and sends it on the service channel as response to the "request" param. SendErrorResponse(request *model.Request, responseErrorCode int, responseErrorMessage string) + + // SendErrorResponseWithPayload is the same as SendErrorResponse, but adds a payload SendErrorResponseWithPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{}) - // Handles unknown/unsupported request. + + // SendErrorResponseWithHeaders is the same as SendErrorResponse, but adds headers as well. + SendErrorResponseWithHeaders(request *model.Request, responseErrorCode int, responseErrorMessage string, headers map[string]string) + + // SendErrorResponseWithHeadersAndPayload is the same as SendErrorResponseWithPayload, but adds headers as well. + SendErrorResponseWithHeadersAndPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string) + + // HandleUnknownRequest handles unknown/unsupported/un-implemented requests, HandleUnknownRequest(request *model.Request) - // Make a new RestService call. + + // RestServiceRequest will make a new RestService call. RestServiceRequest(restRequest *RestServiceRequest, successHandler model.ResponseHandlerFunction, errorHandler model.ResponseHandlerFunction) - // Set global headers for a given fabric service (each service has its own set of global headers). + + // SetHeaders Set global headers for a given fabric service (each service has its own set of global headers). // The headers will be applied to all requests made by this instance's RestServiceRequest method. // Global header values can be overridden per request via the RestServiceRequest.Headers property. SetHeaders(headers map[string]string) - // Automatically ready to go map with json headers. + // GenerateJSONHeaders Automatically ready to go map with json headers. GenerateJSONHeaders() map[string]string } @@ -90,6 +103,39 @@ func (core *fabricCore) SendErrorResponseWithPayload( core.bus.SendResponseMessage(core.channelName, response, request.Id) } +func (core *fabricCore) SendErrorResponseWithHeaders( + request *model.Request, + responseErrorCode int, responseErrorMessage string, headers map[string]string) { + + response := &model.Response{ + Id: request.Id, + Destination: core.channelName, + Headers: headers, + Error: true, + ErrorCode: responseErrorCode, + ErrorMessage: responseErrorMessage, + BrokerDestination: request.BrokerDestination, + } + core.bus.SendResponseMessage(core.channelName, response, request.Id) +} + +func (core *fabricCore) SendErrorResponseWithHeadersAndPayload( + request *model.Request, + responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string) { + + response := &model.Response{ + Id: request.Id, + Destination: core.channelName, + Payload: payload, + Headers: headers, + Error: true, + ErrorCode: responseErrorCode, + ErrorMessage: responseErrorMessage, + BrokerDestination: request.BrokerDestination, + } + core.bus.SendResponseMessage(core.channelName, response, request.Id) +} + func (core *fabricCore) HandleUnknownRequest(request *model.Request) { errorMsg := fmt.Sprintf("unsupported request for \"%s\": %s", core.channelName, request.Request) core.SendErrorResponse(request, 403, errorMsg) diff --git a/service/fabric_core_test.go b/service/fabric_core_test.go index 214ec38..505e6c2 100644 --- a/service/fabric_core_test.go +++ b/service/fabric_core_test.go @@ -97,12 +97,46 @@ func TestFabricCore_SendMethods(t *testing.T) { assert.Equal(t, response.ErrorMessage, "test-error") wg.Add(1) - core.HandleUnknownRequest(&req) + + h = make(map[string]string) + h["chicken"] = "nugget" + core.SendErrorResponseWithHeaders(&req, 422, "test-header-error", h) wg.Wait() assert.Equal(t, count, 4) response = lastMessage.Payload.(*model.Response) + assert.Equal(t, response.Id, req.Id) + assert.Equal(t, response.Headers["chicken"], "nugget") + assert.Nil(t, response.Payload) + assert.True(t, response.Error) + assert.Equal(t, response.ErrorCode, 422) + assert.Equal(t, response.ErrorMessage, "test-header-error") + + wg.Add(1) + + h = make(map[string]string) + h["potato"] = "dog" + core.SendErrorResponseWithHeadersAndPayload(&req, 500, "test-header-payload-error", "oh my!", h) + wg.Wait() + + assert.Equal(t, count, 5) + response = lastMessage.Payload.(*model.Response) + + assert.Equal(t, response.Id, req.Id) + assert.Equal(t, "dog", response.Headers["potato"]) + assert.Equal(t, "oh my!", response.Payload.(string)) + assert.True(t, response.Error) + assert.Equal(t, response.ErrorCode, 500) + assert.Equal(t, response.ErrorMessage, "test-header-payload-error") + + wg.Add(1) + core.HandleUnknownRequest(&req) + wg.Wait() + + assert.Equal(t, count, 6) + response = lastMessage.Payload.(*model.Response) + assert.Equal(t, response.Id, req.Id) assert.True(t, response.Error) assert.Equal(t, 403, response.ErrorCode) diff --git a/service/service_registry.go b/service/service_registry.go index 2f4b0ac..e6e5834 100644 --- a/service/service_registry.go +++ b/service/service_registry.go @@ -1,153 +1,156 @@ -// Copyright 2019-2020 VMware, Inc. +// Copyright 2019-2021 VMware, Inc. // SPDX-License-Identifier: BSD-2-Clause package service import ( - "github.com/vmware/transport-go/bus" - "sync" - "fmt" - "github.com/vmware/transport-go/model" - "log" + "fmt" + "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/model" + "log" + "sync" ) -// Registry with all local fabric services. +// ServiceRegistry is the registry for all local fabric services. type ServiceRegistry interface { - // Registers a new fabric service and associates it with a given EventBus channel. - // Only one fabric service can be associated with a given channel. - // If the fabric service implements the FabricInitializableService interface - // its Init method will be called during the registration process. - RegisterService(service FabricService, serviceChannelName string) error - // Unregisters the fabric service associated with the given channel. - UnregisterService(serviceChannelName string) error - // Set global base host or host:port to be used by the restService - SetGlobalRestServiceBaseHost(host string) + + // RegisterService registers a new fabric service and associates it with a given EventBus channel. + // Only one fabric service can be associated with a given channel. + // If the fabric service implements the FabricInitializableService interface + // its Init method will be called during the registration process. + RegisterService(service FabricService, serviceChannelName string) error + + // UnregisterService unregisters the fabric service associated with the given channel. + UnregisterService(serviceChannelName string) error + + // SetGlobalRestServiceBaseHost sets the global base host or host:port to be used by the restService + SetGlobalRestServiceBaseHost(host string) } type serviceRegistry struct { - lock sync.Mutex - services map[string]*fabricServiceWrapper - bus bus.EventBus + lock sync.Mutex + services map[string]*fabricServiceWrapper + bus bus.EventBus } var once sync.Once var registry ServiceRegistry func GetServiceRegistry() ServiceRegistry { - once.Do(func() { - registry = NewServiceRegistry(bus.GetBus()) - }) - return registry + once.Do(func() { + registry = NewServiceRegistry(bus.GetBus()) + }) + return registry } func NewServiceRegistry(bus bus.EventBus) ServiceRegistry { - registry := &serviceRegistry{ - bus: bus, - services: make(map[string]*fabricServiceWrapper), - } - // auto-register the restService - registry.RegisterService(&restService{}, restServiceChannel) - return registry + registry := &serviceRegistry{ + bus: bus, + services: make(map[string]*fabricServiceWrapper), + } + // auto-register the restService + registry.RegisterService(&restService{}, restServiceChannel) + return registry } func (r *serviceRegistry) SetGlobalRestServiceBaseHost(host string) { - r.services[restServiceChannel].service.(*restService).setBaseHost(host) + r.services[restServiceChannel].service.(*restService).setBaseHost(host) } func (r *serviceRegistry) RegisterService(service FabricService, serviceChannelName string) error { - r.lock.Lock() - defer r.lock.Unlock() + r.lock.Lock() + defer r.lock.Unlock() - if service == nil { - return fmt.Errorf("unable to register service: nil service") - } + if service == nil { + return fmt.Errorf("unable to register service: nil service") + } - if _, ok := r.services[serviceChannelName]; ok { - return fmt.Errorf("unable to register service: service channel name is already used: %s", serviceChannelName) - } + if _, ok := r.services[serviceChannelName]; ok { + return fmt.Errorf("unable to register service: service channel name is already used: %s", serviceChannelName) + } - sw := newServiceWrapper(r.bus, service, serviceChannelName) - err := sw.init() - if err != nil { - return err - } + sw := newServiceWrapper(r.bus, service, serviceChannelName) + err := sw.init() + if err != nil { + return err + } - r.services[serviceChannelName] = sw - return nil + r.services[serviceChannelName] = sw + return nil } func (r *serviceRegistry) UnregisterService(serviceChannelName string) error { - r.lock.Lock() - defer r.lock.Unlock() - sw, ok := r.services[serviceChannelName] - if !ok { - return fmt.Errorf("unable to unregister service: no service is registered for channel \"%s\"", serviceChannelName) - } - sw.unregister() - delete(r.services, serviceChannelName) - return nil + r.lock.Lock() + defer r.lock.Unlock() + sw, ok := r.services[serviceChannelName] + if !ok { + return fmt.Errorf("unable to unregister service: no service is registered for channel \"%s\"", serviceChannelName) + } + sw.unregister() + delete(r.services, serviceChannelName) + return nil } type fabricServiceWrapper struct { - service FabricService - fabricCore *fabricCore - requestMsgHandler bus.MessageHandler + service FabricService + fabricCore *fabricCore + requestMsgHandler bus.MessageHandler } func newServiceWrapper( - bus bus.EventBus, service FabricService, serviceChannelName string) *fabricServiceWrapper { - - return &fabricServiceWrapper{ - service: service, - fabricCore: &fabricCore{ - bus: bus, - channelName: serviceChannelName, - }, - } + bus bus.EventBus, service FabricService, serviceChannelName string) *fabricServiceWrapper { + + return &fabricServiceWrapper{ + service: service, + fabricCore: &fabricCore{ + bus: bus, + channelName: serviceChannelName, + }, + } } func (sw *fabricServiceWrapper) init() error { - sw.fabricCore.bus.GetChannelManager().CreateChannel(sw.fabricCore.channelName) - - initializationService, ok := sw.service.(FabricInitializableService) - if ok { - initializationErr := initializationService.Init(sw.fabricCore) - if initializationErr != nil { - return initializationErr - } - } - - mh, err := sw.fabricCore.bus.ListenRequestStream(sw.fabricCore.channelName) - if err != nil { - return err - } - - sw.requestMsgHandler = mh - mh.Handle( - func(message *model.Message) { - requestPtr, ok := message.Payload.(*model.Request) - if !ok { - request, ok := message.Payload.(model.Request) - if !ok { - log.Println("cannot cast service request payload to model.Request") - return - } - requestPtr = &request - } - - if message.DestinationId != nil { - requestPtr.Id = message.DestinationId - } - - sw.service.HandleServiceRequest(requestPtr, sw.fabricCore) - }, - func(e error) {}) - - return nil + sw.fabricCore.bus.GetChannelManager().CreateChannel(sw.fabricCore.channelName) + + initializationService, ok := sw.service.(FabricInitializableService) + if ok { + initializationErr := initializationService.Init(sw.fabricCore) + if initializationErr != nil { + return initializationErr + } + } + + mh, err := sw.fabricCore.bus.ListenRequestStream(sw.fabricCore.channelName) + if err != nil { + return err + } + + sw.requestMsgHandler = mh + mh.Handle( + func(message *model.Message) { + requestPtr, ok := message.Payload.(*model.Request) + if !ok { + request, ok := message.Payload.(model.Request) + if !ok { + log.Println("cannot cast service request payload to model.Request") + return + } + requestPtr = &request + } + + if message.DestinationId != nil { + requestPtr.Id = message.DestinationId + } + + sw.service.HandleServiceRequest(requestPtr, sw.fabricCore) + }, + func(e error) {}) + + return nil } func (sw *fabricServiceWrapper) unregister() { - if sw.requestMsgHandler != nil { - sw.requestMsgHandler.Close() - } + if sw.requestMsgHandler != nil { + sw.requestMsgHandler.Close() + } } diff --git a/transport.go b/transport.go index 668c583..2d3d233 100644 --- a/transport.go +++ b/transport.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 VMware, Inc. +// Copyright 2019-2021 VMware, Inc. // SPDX-License-Identifier: BSD-2-Clause package main @@ -45,28 +45,28 @@ func main() { return nil }, }, - { - Name: "cal", - Usage: "Call Calendar service for the time on appfabric.vmware.com", - Action: func(c *cli.Context) error { - runDemoCal() - return nil - }, - }, - { - Name: "vm-service", - Usage: "Call VmService to create and Power on a new VM on appfabric.vmware.com", - Action: func(c *cli.Context) error { - runDemoVmService(c) - return nil - }, - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "localhost", - Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", - }, - }, - }, + //{ + // Name: "cal", + // Usage: "Call Calendar service for the time on appfabric.vmware.com", + // Action: func(c *cli.Context) error { + // runDemoCal() + // return nil + // }, + //}, + //{ + // Name: "vm-service", + // Usage: "Call VmService to create and Power on a new VM on appfabric.vmware.com", + // Action: func(c *cli.Context) error { + // runDemoVmService(c) + // return nil + // }, + // Flags: []cli.Flag{ + // &cli.BoolFlag{ + // Name: "localhost", + // Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", + // }, + // }, + //}, { Name: "service", Usage: "Run Service - Run local service", @@ -81,27 +81,27 @@ func main() { return nil }, }, - { - Name: "store", - Usage: "Open galactic store from appfabric.vmware.com", - Action: func(c *cli.Context) error { - runDemoStore(c) - return nil - }, - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "localhost", - Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", - }, - }, - }, + //{ + // Name: "store", + // Usage: "Open galactic store from appfabric.vmware.com", + // Action: func(c *cli.Context) error { + // runDemoStore(c) + // return nil + // }, + // Flags: []cli.Flag{ + // &cli.BoolFlag{ + // Name: "localhost", + // Usage: "Connect to localhost:8090 instead of appfabric.vmware.com", + // }, + // }, + //}, { Name: "fabric-services", Usage: "Starts a couple of demo fabric services locally", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "localhost", - Usage: "Use localhost Bifrost broker", + Usage: "Use localhost transport broker", }, }, Action: func(c *cli.Context) error {