Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Small improvements to Stock Ticker service #21

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions plank/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ BUILD_OS=darwin|linux|windows go run build.go

Once successfully built, `plank` binary will be ready under `build/`.

> NOTE: we acknowledge there's a lack of build script for Windows Powershell, We'll add it soon!

### Generate a self signed certificate
Plank can run in non-HTTPS mode but it's generally a good idea to always do development in a similar environment where you'll be serving your
audience in public internet (or even intranet). Plank repository comes with a handy utility script that can generate a pair of server certificate
Expand Down Expand Up @@ -80,12 +78,24 @@ SPA static assets /assets
Health endpoint /health
Prometheus endpoint /prometheus
...
time="2021-08-05T21:32:50-07:00" level=info msg="Starting HTTP server at localhost:30080 with TLS" fileName=server.go goroutine=28 package=server
time="2021-08-17T13:28:15-07:00" level=info msg="Service '*services.StockTickerService' initialized successfully" fileName=initialize.go goroutine=44 package=server
time="2021-08-17T13:28:15-07:00" level=info msg="Service channel 'stock-ticker-service' is now bridged to a REST endpoint /rest/stock-ticker/{symbol} (GET)\n" fileName=server.go goroutine=44 package=server
time="2021-08-17T13:28:15-07:00" level=info msg="Starting Fabric broker at localhost:30080/ws" fileName=server.go goroutine=1 package=server
time="2021-08-17T13:28:15-07:00" level=info msg="Starting HTTP server at localhost:30080 with TLS" fileName=server.go goroutine=3 package=server
```

Open your browser and navigate to https://localhost:30080, accept the self-signed certificate warning and you'll be greeted with a 404!
This is an expected behavior, as the demo app does not serve anything at root `/`, but we will consider changing the default 404 screen to
something that looks more informational or more appealing at least.
Now, open your browser and navigate to https://localhost:30080/rest/stock-ticker/VMW (or
type `curl -k https://localhost:30080/rest/stock-ticker/VMW` in Terminal if you prefer CLI),
and accept the self-signed certificate warning. You will be served a page that shows the latest stock price
for VMware, Inc. Try and swap out `VMW` with another symbol of your choice to further test it out!

> NOTE: The sample service is using a loosely gated third party API which imposes
> a substantial limit on how many calls you can make per minute and per day in return for making
> the service free to all.

> NOTE: If you navigate to the root at https://localhost:30080, you'll be greeted with a 404!
> This is an expected behavior, as the demo app does not serve anything at root `/`, but we will
> consider changing the default 404 screen to something that is informational or more appealing at least.

## All supported flags and usages

Expand Down
166 changes: 110 additions & 56 deletions plank/services/stock-ticker-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package services
import (
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
"github.com/vmware/transport-go/plank/utils"
Expand All @@ -22,22 +25,23 @@ import (

const (
StockTickerServiceChannel = "stock-ticker-service"
StockTickerAPI = "https://www.alphavantage.co/query"
StockTickerAPI = "https://www.alphavantage.co/query"
)

// TickerSnapshotData and TickerMetadata ares the data structures for this demo service
type TickerSnapshotData struct {
MetaData *TickerMetadata `json:"Meta Data"`
MetaData *TickerMetadata `json:"Meta Data"`
TimeSeries map[string]map[string]interface{} `json:"Time Series (1min)"`
Note string `json:"Note"`
}

type TickerMetadata struct {
Information string `json:"1. Information"`
Symbol string `json:"2. Symbol"`
Information string `json:"1. Information"`
Symbol string `json:"2. Symbol"`
LastRefreshed string `json:"3. Last Refreshed"`
Interval string `json:"4. Interval"`
OutputSize string `json:"5. Output Size"`
TimeZone string `json:"6. Time Zone"`
Interval string `json:"4. Interval"`
OutputSize string `json:"5. Output Size"`
TimeZone string `json:"6. Time Zone"`
}

// StockTickerService is a more complex real life example where its job is to subscribe clients
Expand All @@ -47,9 +51,9 @@ type TickerMetadata struct {
// once the service receives the request, it will schedule a job to query the stock price API
// for the provided symbol, retrieve the data and pipe it back to the client every thirty seconds.
// upon the connected client leaving, the service will remove from its cache the timer.
type StockTickerService struct{
type StockTickerService struct {
tickerListenersMap map[string]*time.Ticker
lock sync.RWMutex
lock sync.RWMutex
}

// NewStockTickerService returns a new instance of StockTickerService
Expand All @@ -63,11 +67,31 @@ func NewStockTickerService() *StockTickerService {
// a third party API and return the results back to the user.
func (ps *StockTickerService) HandleServiceRequest(request *model.Request, core service.FabricServiceCore) {
switch request.Request {
case "receive_ticker_updates":
case "ticker_price_lookup":
input := request.Payload.(map[string]string)
response, err := queryStockTickerAPI(input["symbol"])
if err != nil {
core.SendErrorResponse(request, 400, err.Error())
return
}
// send the response back to the client
core.SendResponse(request, response)
break

case "ticker_price_update_stream":
// parse the request and extract user input from key "symbol"
input := request.Payload.(map[string]interface{})
symbol := input["symbol"].(string)

// get the price immediately for the first request
response, err := queryStockTickerAPI(symbol)
if err != nil {
core.SendErrorResponse(request, 400, err.Error())
return
}
// send the response back to the client
core.SendResponse(request, response)

// set a ticker that fires every 30 seconds and keep it in a map for later disposal
ps.lock.Lock()
ticker := time.NewTicker(30 * time.Second)
Expand All @@ -79,57 +103,18 @@ func (ps *StockTickerService) HandleServiceRequest(request *model.Request, core
for {
select {
case <-ticker.C:
// craft a new HTTP request for the stock price provider API
req, err := newTickerRequest(symbol)
if err != nil {
core.SendErrorResponse(request, 400, err.Error())
continue
}

// perform an HTTP call
rsp, err := ctxhttp.Do(context.Background(), http.DefaultClient, req)
if err != nil {
core.SendErrorResponse(request, rsp.StatusCode, err.Error())
continue
}

// parse the response from the HTTP call
defer rsp.Body.Close()
tickerData := &TickerSnapshotData{}
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
core.SendErrorResponse(request, 500, err.Error())
continue
}

if err = json.Unmarshal(b, tickerData); err != nil {
core.SendErrorResponse(request, 500, err.Error())
continue
}

if tickerData == nil || tickerData.TimeSeries == nil {
core.SendErrorResponse(request, 500, string(b))
continue
}

// extract the data we need.
latestClosePriceStr := tickerData.TimeSeries[tickerData.MetaData.LastRefreshed]["4. close"].(string)
latestClosePrice, err := strconv.ParseFloat(latestClosePriceStr, 32)
response, err = queryStockTickerAPI(symbol)
if err != nil {
core.SendErrorResponse(request, 500, err.Error())
continue
}

// log message to demonstrate that once the client disconnects
// the server disposes of the ticker to prevent memory leak.
utils.Log.Warnln("sending...")
utils.Log.Infoln("sending...")

// send the response back to the client
core.SendResponse(request, map[string]interface{}{
"symbol": symbol,
"lastRefreshed": tickerData.MetaData.LastRefreshed,
"closePrice": latestClosePrice,
})
core.SendResponse(request, response)
}
}
}()
Expand Down Expand Up @@ -173,10 +158,27 @@ func (ps *StockTickerService) OnServerShutdown() {
return
}

// GetRESTBridgeConfig returns nothing. this service is only available through
// STOMP over WebSocket.
// GetRESTBridgeConfig returns a config for a REST endpoint that performs the same action as the STOMP variant
// except that there will be only one response instead of every 30 seconds.
func (ps *StockTickerService) GetRESTBridgeConfig() []*service.RESTBridgeConfig {
return nil
return []*service.RESTBridgeConfig{
{
ServiceChannel: StockTickerServiceChannel,
Uri: "/rest/stock-ticker/{symbol}",
Method: http.MethodGet,
AllowHead: true,
AllowOptions: true,
FabricRequestBuilder: func(w http.ResponseWriter, r *http.Request) model.Request {
pathParams := mux.Vars(r)
return model.Request{
Id: &uuid.UUID{},
Payload: map[string]string{"symbol": pathParams["symbol"]},
jooskim marked this conversation as resolved.
Show resolved Hide resolved
Request: "ticker_price_lookup",
BrokerDestination: nil,
}
},
},
}
}

// newTickerRequest is a convenient function that takes symbol as an input and returns
Expand All @@ -194,4 +196,56 @@ func newTickerRequest(symbol string) (*http.Request, error) {
}
req.URL.RawQuery = uv.Encode()
return req, nil
}
}

// queryStockTickerAPI performs an HTTP request against the Stock Ticker API and returns the results
// as a generic map[string]interface{} structure. if there's any error during the request-response cycle
// a nil will be returned followed by an error object.
func queryStockTickerAPI(symbol string) (map[string]interface{}, error) {
// craft a new HTTP request for the stock price provider API
req, err := newTickerRequest(symbol)
if err != nil {
return nil, err
}

// perform an HTTP call
rsp, err := ctxhttp.Do(context.Background(), http.DefaultClient, req)
if err != nil {
return nil, err
}

// parse the response from the HTTP call
defer rsp.Body.Close()
tickerData := &TickerSnapshotData{}
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

if err = json.Unmarshal(b, tickerData); err != nil {
return nil, err
}

// Alpha Vantage which is the provider of this API limits API calls to 5 calls per minute and 500 a day, and when
// the quota has been reached it will return a message in the Note field.
if len(tickerData.Note) > 0 {
return nil, fmt.Errorf(tickerData.Note)
}

if tickerData == nil || tickerData.TimeSeries == nil {
return nil, err
}

// extract the data we need.
latestClosePriceStr := tickerData.TimeSeries[tickerData.MetaData.LastRefreshed]["4. close"].(string)
latestClosePrice, err := strconv.ParseFloat(latestClosePriceStr, 32)
if err != nil {
return nil, err
}

return map[string]interface{}{
"symbol": symbol,
"lastRefreshed": tickerData.MetaData.LastRefreshed,
"closePrice": latestClosePrice,
}, nil
}