Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS transport #680

Merged
merged 1 commit into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions examples/addsvc/thrift/gen-go/addsvc/addsvc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

206 changes: 206 additions & 0 deletions examples/stringsvc4/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package main

import (
"context"
"encoding/json"
"errors"
"log"
"strings"
"flag"
"net/http"

"github.com/go-kit/kit/endpoint"
natstransport "github.com/go-kit/kit/transport/nats"
httptransport "github.com/go-kit/kit/transport/http"

"github.com/nats-io/go-nats"
)

// StringService provides operations on strings.
type StringService interface {
Uppercase(context.Context, string) (string, error)
Count(context.Context, string) int
}

// stringService is a concrete implementation of StringService
type stringService struct{}

func (stringService) Uppercase(_ context.Context, s string) (string, error) {
if s == "" {
return "", ErrEmpty
}
return strings.ToUpper(s), nil
}

func (stringService) Count(_ context.Context, s string) int {
return len(s)
}

// ErrEmpty is returned when an input string is empty.
var ErrEmpty = errors.New("empty string")

// For each method, we define request and response structs
type uppercaseRequest struct {
S string `json:"s"`
}

type uppercaseResponse struct {
V string `json:"v"`
Err string `json:"err,omitempty"` // errors don't define JSON marshaling
}

type countRequest struct {
S string `json:"s"`
}

type countResponse struct {
V int `json:"v"`
}

// Endpoints are a primary abstraction in go-kit. An endpoint represents a single RPC (method in our service interface)
func makeUppercaseHTTPEndpoint(nc *nats.Conn) endpoint.Endpoint {
return natstransport.NewPublisher(
nc,
"stringsvc.uppercase",
natstransport.EncodeJSONRequest,
decodeUppercaseResponse,
).Endpoint()
}

func makeCountHTTPEndpoint(nc *nats.Conn) endpoint.Endpoint {
return natstransport.NewPublisher(
nc,
"stringsvc.count",
natstransport.EncodeJSONRequest,
decodeCountResponse,
).Endpoint()
}

func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(uppercaseRequest)
v, err := svc.Uppercase(ctx, req.S)
if err != nil {
return uppercaseResponse{v, err.Error()}, nil
}
return uppercaseResponse{v, ""}, nil
}
}

func makeCountEndpoint(svc StringService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(countRequest)
v := svc.Count(ctx, req.S)
return countResponse{v}, nil
}
}

// Transports expose the service to the network. In this fourth example we utilize JSON over NATS and HTTP.
func main() {
svc := stringService{}

natsURL := flag.String("nats-url", nats.DefaultURL, "URL for connection to NATS")
flag.Parse()

nc, err := nats.Connect(*natsURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()

uppercaseHTTPHandler := httptransport.NewServer(
makeUppercaseHTTPEndpoint(nc),
decodeUppercaseHTTPRequest,
httptransport.EncodeJSONResponse,
)

countHTTPHandler := httptransport.NewServer(
makeCountHTTPEndpoint(nc),
decodeCountHTTPRequest,
httptransport.EncodeJSONResponse,
)

uppercaseHandler := natstransport.NewSubscriber(
makeUppercaseEndpoint(svc),
decodeUppercaseRequest,
natstransport.EncodeJSONResponse,
)

countHandler := natstransport.NewSubscriber(
makeCountEndpoint(svc),
decodeCountRequest,
natstransport.EncodeJSONResponse,
)

uSub, err := nc.QueueSubscribe("stringsvc.uppercase", "stringsvc", uppercaseHandler.ServeMsg(nc))
if err != nil {
log.Fatal(err)
}
defer uSub.Unsubscribe()

cSub, err := nc.QueueSubscribe("stringsvc.count", "stringsvc", countHandler.ServeMsg(nc))
if err != nil {
log.Fatal(err)
}
defer cSub.Unsubscribe()

http.Handle("/uppercase", uppercaseHTTPHandler)
http.Handle("/count", countHTTPHandler)
log.Fatal(http.ListenAndServe(":8080", nil))

}

func decodeUppercaseHTTPRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request uppercaseRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}

func decodeCountHTTPRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request countRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}

func decodeUppercaseResponse(_ context.Context, msg *nats.Msg) (interface{}, error) {
var response uppercaseResponse

if err := json.Unmarshal(msg.Data, &response); err != nil {
return nil, err
}

return response, nil
}

func decodeCountResponse(_ context.Context, msg *nats.Msg) (interface{}, error) {
var response countResponse

if err := json.Unmarshal(msg.Data, &response); err != nil {
return nil, err
}

return response, nil
}

func decodeUppercaseRequest(_ context.Context, msg *nats.Msg) (interface{}, error) {
var request uppercaseRequest

if err := json.Unmarshal(msg.Data, &request); err != nil {
return nil, err
}
return request, nil
}

func decodeCountRequest(_ context.Context, msg *nats.Msg) (interface{}, error) {
var request countRequest

if err := json.Unmarshal(msg.Data, &request); err != nil {
return nil, err
}
return request, nil
}

2 changes: 2 additions & 0 deletions transport/nats/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package nats provides a NATS transport.
package nats
32 changes: 32 additions & 0 deletions transport/nats/encode_decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package nats

import (
"context"

"github.com/nats-io/go-nats"
)

// DecodeRequestFunc extracts a user-domain request object from a publisher
// request object. It's designed to be used in NATS subscribers, for subscriber-side
// endpoints. One straightforward DecodeRequestFunc could be something that
// JSON decodes from the request body to the concrete response type.
type DecodeRequestFunc func(context.Context, *nats.Msg) (request interface{}, err error)

// EncodeRequestFunc encodes the passed request object into the NATS request
// object. It's designed to be used in NATS publishers, for publisher-side
// endpoints. One straightforward EncodeRequestFunc could something that JSON
// encodes the object directly to the request payload.
type EncodeRequestFunc func(context.Context, *nats.Msg, interface{}) error

// EncodeResponseFunc encodes the passed response object to the subscriber reply.
// It's designed to be used in NATS subscribers, for subscriber-side
// endpoints. One straightforward EncodeResponseFunc could be something that
// JSON encodes the object directly to the response body.
type EncodeResponseFunc func(context.Context, string, *nats.Conn, interface{}) error

// DecodeResponseFunc extracts a user-domain response object from an NATS
// response object. It's designed to be used in NATS publisher, for publisher-side
// endpoints. One straightforward DecodeResponseFunc could be something that
// JSON decodes from the response payload to the concrete response type.
type DecodeResponseFunc func(context.Context, *nats.Msg) (response interface{}, err error)

Loading