Skip to content

Commit

Permalink
JSON RPC over HTTP (go-kit#576)
Browse files Browse the repository at this point in the history
* first pass at JSON RPC HTTP transport

* example implementation of JSON RPC over HTTP

* Add ID type for JSON RPC Request, with tests.

* Add basic server testing for JSON RPC.

Add basic server tests, following example from http transport. Switch Response.Error to pointer, to make absence clearer.

* Handle unregistered JSON RPC methods.

* Package tidy-up.

* Test ServerBefore / ServerAfter for JSON RPC.

* More JSON RPC tests.

* Remove JSON RPC from addsvc example, pending full JSON RPC example.

* Remove JSON RPC from addsvc example, pending full JSON RPC example.

* Remove context field from jsonrpc.Server.

* Add JSON content type to all JSON RPC responses.

* Add JSON content type to all JSON RPC responses.

* Remove client-side JSON RPC funcs for now.

* Document interceptingWriter

* Add JSON RPC doc.go.

* Add README for JSON RPC.

* Wire in JSON RPC addsvc.

* Add JSON RPC to Addsvc CLI.

* Set JSONRPC version in responses.

* Add JSON RPC client to addcli example.

* Wire in client middlewares for JSON RPC addsvc example.

* Fix rate limiter dependency.

* Add concat JSON RPC method.

* Improve JSON RPC server test coverage.

* Add error tests.

* Clarify ErrorCoder in comment.

* Make endpoint consistent in README.

* Gofmt handler example in README.

* Auto-increment client IDs. Allow for customisation.

* Add happy-path test for JSON RPC client.

* Provide default encoder/decoder in JSON RPC client.

* Fix comment line.

* RequestIDGenerator tidy-up.

Make auto-incrementing IDs goroutine safe.
Make RequestIDGenerator interface public.

* Fix client ID creation.

The client had been using the RequestID type in requests. Making this
serialize in a deterministic and predictable way was going to be fiddly, so
I decided to allow interface{} for IDs, client-side.

* Test client request ID more effectively.

* Cover client options in test.

* Improve error test coverage.

* Fix format spec in test output.

* Tweaks to satisfy the linter.
  • Loading branch information
rossmcf authored and peterbourgon committed Feb 19, 2018
1 parent cd62aee commit 6241e73
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 5 deletions.
3 changes: 3 additions & 0 deletions addsvc/cmd/addcli/addcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
httpAddr = fs.String("http-addr", "", "HTTP address of addsvc")
grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc")
thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc")
jsonRPCAddr = fs.String("jsonrpc-addr", "", "JSON RPC address of addsvc")
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
Expand Down Expand Up @@ -102,6 +103,8 @@ func main() {
}
defer conn.Close()
svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
} else if *jsonRPCAddr != "" {
svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, tracer, log.NewNopLogger())
} else if *thriftAddr != "" {
// It's necessary to do all of this construction in the func main,
// because (among other reasons) we need to control the lifecycle of the
Expand Down
25 changes: 20 additions & 5 deletions addsvc/cmd/addsvc/addsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
httpAddr = fs.String("http-addr", ":8081", "HTTP listen address")
grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address")
thriftAddr = fs.String("thrift-addr", ":8083", "Thrift listen address")
jsonRPCAddr = fs.String("jsonrpc-addr", ":8084", "JSON RPC listen address")
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
Expand Down Expand Up @@ -135,11 +136,12 @@ func main() {
// the interfaces that the transports expect. Note that we're not binding
// them to ports or anything yet; we'll do that next.
var (
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service, logger, duration, tracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
thriftServer = addtransport.NewThriftServer(endpoints)
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service, logger, duration, tracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
thriftServer = addtransport.NewThriftServer(endpoints)
jsonrpcHandler = addtransport.NewJSONRPCHandler(endpoints, logger)
)

// Now we're to the part of the func main where we want to start actually
Expand Down Expand Up @@ -244,6 +246,19 @@ func main() {
thriftSocket.Close()
})
}
{
httpListener, err := net.Listen("tcp", *jsonRPCAddr)
if err != nil {
logger.Log("transport", "JSONRPC over HTTP", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "JSONRPC over HTTP", "addr", *jsonRPCAddr)
return http.Serve(httpListener, jsonrpcHandler)
}, func(error) {
httpListener.Close()
})
}
{
// This function just sits and waits for ctrl-C.
cancelInterrupt := make(chan struct{})
Expand Down
207 changes: 207 additions & 0 deletions addsvc/pkg/addtransport/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package addtransport

import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"

"golang.org/x/time/rate"

"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/transport/http/jsonrpc"
stdopentracing "github.com/opentracing/opentracing-go"
"github.com/sony/gobreaker"
)

// NewJSONRPCHandler returns a JSON RPC Server/Handler that can be passed to http.Handle()
func NewJSONRPCHandler(endpoints addendpoint.Set, logger log.Logger) *jsonrpc.Server {
handler := jsonrpc.NewServer(
makeEndpointCodecMap(endpoints),
jsonrpc.ServerErrorLogger(logger),
)
return handler
}

// NewJSONRPCClient returns an addservice backed by a JSON RPC over HTTP server
// living at the remote instance. We expect instance to come from a service
// discovery system, so likely of the form "host:port". We bake-in certain
// middlewares, implementing the client library pattern.
func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
// Quickly sanitize the instance string.
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
u, err := url.Parse(instance)
if err != nil {
return nil, err
}

// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

var sumEndpoint endpoint.Endpoint
{
sumEndpoint = jsonrpc.NewClient(
u,
"sum",
jsonrpc.ClientRequestEncoder(encodeSumRequest),
jsonrpc.ClientResponseDecoder(decodeSumResponse),
).Endpoint()
sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Timeout: 30 * time.Second,
}))(sumEndpoint)
}

var concatEndpoint endpoint.Endpoint
{
concatEndpoint = jsonrpc.NewClient(
u,
"concat",
jsonrpc.ClientRequestEncoder(encodeConcatRequest),
jsonrpc.ClientResponseDecoder(decodeConcatResponse),
).Endpoint()
concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Timeout: 30 * time.Second,
}))(concatEndpoint)
}

// Returning the endpoint.Set as a service.Service relies on the
// endpoint.Set implementing the Service methods. That's just a simple bit
// of glue code.
return addendpoint.Set{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}, nil

}

// makeEndpointCodecMap returns a codec map configured for the addsvc.
func makeEndpointCodecMap(endpoints addendpoint.Set) jsonrpc.EndpointCodecMap {
return jsonrpc.EndpointCodecMap{
"sum": jsonrpc.EndpointCodec{
Endpoint: endpoints.SumEndpoint,
Decode: decodeSumRequest,
Encode: encodeSumResponse,
},
"concat": jsonrpc.EndpointCodec{
Endpoint: endpoints.ConcatEndpoint,
Decode: decodeConcatRequest,
Encode: encodeConcatResponse,
},
}
}

func decodeSumRequest(_ context.Context, msg json.RawMessage) (interface{}, error) {
var req addendpoint.SumRequest
err := json.Unmarshal(msg, &req)
if err != nil {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("couldn't unmarshal body to sum request: %s", err),
}
}
return req, nil
}

func encodeSumResponse(_ context.Context, obj interface{}) (json.RawMessage, error) {
res, ok := obj.(addendpoint.SumResponse)
if !ok {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("Asserting result to *SumResponse failed. Got %T, %+v", obj, obj),
}
}
b, err := json.Marshal(res)
if err != nil {
return nil, fmt.Errorf("couldn't marshal response: %s", err)
}
return b, nil
}

func decodeSumResponse(_ context.Context, msg json.RawMessage) (interface{}, error) {
var res addendpoint.SumResponse
err := json.Unmarshal(msg, &res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal body to SumResponse: %s", err)
}
return res, nil
}

func encodeSumRequest(_ context.Context, obj interface{}) (json.RawMessage, error) {
req, ok := obj.(addendpoint.SumRequest)
if !ok {
return nil, fmt.Errorf("couldn't assert request as SumRequest, got %T", obj)
}
b, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("couldn't marshal request: %s", err)
}
return b, nil
}

func decodeConcatRequest(_ context.Context, msg json.RawMessage) (interface{}, error) {
var req addendpoint.ConcatRequest
err := json.Unmarshal(msg, &req)
if err != nil {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("couldn't unmarshal body to concat request: %s", err),
}
}
return req, nil
}

func encodeConcatResponse(_ context.Context, obj interface{}) (json.RawMessage, error) {
res, ok := obj.(addendpoint.ConcatResponse)
if !ok {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("Asserting result to *ConcatResponse failed. Got %T, %+v", obj, obj),
}
}
b, err := json.Marshal(res)
if err != nil {
return nil, fmt.Errorf("couldn't marshal response: %s", err)
}
return b, nil
}

func decodeConcatResponse(_ context.Context, msg json.RawMessage) (interface{}, error) {
var res addendpoint.ConcatResponse
err := json.Unmarshal(msg, &res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal body to ConcatResponse: %s", err)
}
return res, nil
}

func encodeConcatRequest(_ context.Context, obj interface{}) (json.RawMessage, error) {
req, ok := obj.(addendpoint.ConcatRequest)
if !ok {
return nil, fmt.Errorf("couldn't assert request as ConcatRequest, got %T", obj)
}
b, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("couldn't marshal request: %s", err)
}
return b, nil
}

0 comments on commit 6241e73

Please sign in to comment.