From 25425d98bf3ffe33732cbfbe33873cf6edea23e6 Mon Sep 17 00:00:00 2001 From: Marcus Cobden Date: Tue, 30 May 2017 13:19:50 +0100 Subject: [PATCH 1/4] Split expected & unexpected errors in GRPC metrics --- httpgrpc/README.md | 4 +- httpgrpc/httpgrpc.go | 62 ++------ httpgrpc/httpgrpc_test.go | 3 +- httpgrpc/{ => types}/httpgrpc.pb.go | 59 ++++---- httpgrpc/{ => types}/httpgrpc.proto | 2 +- httpgrpc/types/util.go | 47 +++++++ middleware/grpc_instrumentation.go | 37 ++++- server/fake_server.pb.go | 210 ++++++++++++++++++++++++++++ server/fake_server.proto | 15 ++ server/server.go | 3 +- server/server_test.go | 110 +++++++++++++++ 11 files changed, 463 insertions(+), 89 deletions(-) rename httpgrpc/{ => types}/httpgrpc.pb.go (70%) rename httpgrpc/{ => types}/httpgrpc.proto (95%) create mode 100644 httpgrpc/types/util.go create mode 100644 server/fake_server.pb.go create mode 100644 server/fake_server.proto create mode 100644 server/server_test.go diff --git a/httpgrpc/README.md b/httpgrpc/README.md index 3cb4a69f..1f3881d9 100644 --- a/httpgrpc/README.md +++ b/httpgrpc/README.md @@ -4,6 +4,6 @@ To rebuild generated protobuf code, run: - protoc -I ./ --go_out=plugins=grpc:./ ./httpgrpc.proto + protoc -I ./ --go_out=plugins=grpc:./ ./types/httpgrpc.proto -Follow the instructions here to get a working protoc: https://github.com/golang/protobuf \ No newline at end of file +Follow the instructions here to get a working protoc: https://github.com/golang/protobuf diff --git a/httpgrpc/httpgrpc.go b/httpgrpc/httpgrpc.go index 9c9abe7a..98742cd8 100644 --- a/httpgrpc/httpgrpc.go +++ b/httpgrpc/httpgrpc.go @@ -11,18 +11,14 @@ import ( "strings" "sync" - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/any" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" "github.com/opentracing/opentracing-go" - "github.com/prometheus/common/log" "github.com/sercand/kuberesolver" "golang.org/x/net/context" - spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" - "google.golang.org/grpc/status" + "github.com/weaveworks/common/httpgrpc/types" "github.com/weaveworks/common/middleware" ) @@ -40,7 +36,7 @@ func NewServer(handler http.Handler) *Server { } // Handle implements HTTPServer. -func (s Server) Handle(ctx context.Context, r *HTTPRequest) (*HTTPResponse, error) { +func (s Server) Handle(ctx context.Context, r *types.HTTPRequest) (*types.HTTPResponse, error) { req, err := http.NewRequest(r.Method, r.Url, ioutil.NopCloser(bytes.NewReader(r.Body))) if err != nil { return nil, err @@ -50,13 +46,13 @@ func (s Server) Handle(ctx context.Context, r *HTTPRequest) (*HTTPResponse, erro req.RequestURI = r.Url recorder := httptest.NewRecorder() s.handler.ServeHTTP(recorder, req) - resp := &HTTPResponse{ + resp := &types.HTTPResponse{ Code: int32(recorder.Code), Headers: fromHeader(recorder.Header()), Body: recorder.Body.Bytes(), } if recorder.Code/100 == 5 { - return nil, errorFromHTTPResponse(resp) + return nil, types.ErrorFromHTTPResponse(resp) } return resp, err } @@ -67,7 +63,7 @@ type Client struct { service string namespace string port string - client HTTPClient + client types.HTTPClient conn *grpc.ClientConn } @@ -125,7 +121,7 @@ func NewClient(address string) (*Client, error) { } return &Client{ - client: NewHTTPClient(conn), + client: types.NewHTTPClient(conn), conn: conn, }, nil } @@ -137,7 +133,7 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - req := &HTTPRequest{ + req := &types.HTTPRequest{ Method: r.Method, Url: r.RequestURI, Body: body, @@ -148,7 +144,7 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { // Some errors will actually contain a valid resp, just need to unpack it var ok bool - resp, ok = httpResponseFromError(err) + resp, ok = types.HTTPResponseFromError(err) if !ok { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -164,50 +160,16 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func errorFromHTTPResponse(resp *HTTPResponse) error { - a, err := ptypes.MarshalAny(resp) - if err != nil { - return err - } - - return status.ErrorProto(&spb.Status{ - Code: resp.Code, - Message: string(resp.Body), - Details: []*any.Any{a}, - }) -} - -func httpResponseFromError(err error) (*HTTPResponse, bool) { - s, ok := status.FromError(err) - if !ok { - fmt.Println("not status") - return nil, false - } - - status := s.Proto() - if len(status.Details) != 1 { - return nil, false - } - - var resp HTTPResponse - if err := ptypes.UnmarshalAny(status.Details[0], &resp); err != nil { - log.Errorf("Got error containing non-response: %v", err) - return nil, false - } - - return &resp, true -} - -func toHeader(hs []*Header, header http.Header) { +func toHeader(hs []*types.Header, header http.Header) { for _, h := range hs { header[h.Key] = h.Values } } -func fromHeader(hs http.Header) []*Header { - result := make([]*Header, 0, len(hs)) +func fromHeader(hs http.Header) []*types.Header { + result := make([]*types.Header, 0, len(hs)) for k, vs := range hs { - result = append(result, &Header{ + result = append(result, &types.Header{ Key: k, Values: vs, }) diff --git a/httpgrpc/httpgrpc_test.go b/httpgrpc/httpgrpc_test.go index 415180c6..9e3b298b 100644 --- a/httpgrpc/httpgrpc_test.go +++ b/httpgrpc/httpgrpc_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc/types" "github.com/weaveworks/common/user" "google.golang.org/grpc" ) @@ -33,7 +34,7 @@ func newTestServer(handler http.Handler) (*testServer, error) { URL: "direct://" + lis.Addr().String(), } - RegisterHTTPServer(server.grpcServer, server.Server) + types.RegisterHTTPServer(server.grpcServer, server.Server) go server.grpcServer.Serve(lis) return server, nil diff --git a/httpgrpc/httpgrpc.pb.go b/httpgrpc/types/httpgrpc.pb.go similarity index 70% rename from httpgrpc/httpgrpc.pb.go rename to httpgrpc/types/httpgrpc.pb.go index 02b2a478..5f583170 100644 --- a/httpgrpc/httpgrpc.pb.go +++ b/httpgrpc/types/httpgrpc.pb.go @@ -1,19 +1,18 @@ -// Code generated by protoc-gen-go. -// source: httpgrpc.proto -// DO NOT EDIT! +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: types/httpgrpc.proto /* -Package httpgrpc is a generated protocol buffer package. +Package types is a generated protocol buffer package. It is generated from these files: - httpgrpc.proto + types/httpgrpc.proto It has these top-level messages: HTTPRequest HTTPResponse Header */ -package httpgrpc +package types import proto "github.com/golang/protobuf/proto" import fmt "fmt" @@ -132,9 +131,9 @@ func (m *Header) GetValues() []string { } func init() { - proto.RegisterType((*HTTPRequest)(nil), "httpgrpc.HTTPRequest") - proto.RegisterType((*HTTPResponse)(nil), "httpgrpc.HTTPResponse") - proto.RegisterType((*Header)(nil), "httpgrpc.Header") + proto.RegisterType((*HTTPRequest)(nil), "types.HTTPRequest") + proto.RegisterType((*HTTPResponse)(nil), "types.HTTPResponse") + proto.RegisterType((*Header)(nil), "types.Header") } // Reference imports to suppress errors if they are not otherwise used. @@ -161,7 +160,7 @@ func NewHTTPClient(cc *grpc.ClientConn) HTTPClient { func (c *hTTPClient) Handle(ctx context.Context, in *HTTPRequest, opts ...grpc.CallOption) (*HTTPResponse, error) { out := new(HTTPResponse) - err := grpc.Invoke(ctx, "/httpgrpc.HTTP/Handle", in, out, c.cc, opts...) + err := grpc.Invoke(ctx, "/types.HTTP/Handle", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -188,7 +187,7 @@ func _HTTP_Handle_Handler(srv interface{}, ctx context.Context, dec func(interfa } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/httpgrpc.HTTP/Handle", + FullMethod: "/types.HTTP/Handle", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(HTTPServer).Handle(ctx, req.(*HTTPRequest)) @@ -197,7 +196,7 @@ func _HTTP_Handle_Handler(srv interface{}, ctx context.Context, dec func(interfa } var _HTTP_serviceDesc = grpc.ServiceDesc{ - ServiceName: "httpgrpc.HTTP", + ServiceName: "types.HTTP", HandlerType: (*HTTPServer)(nil), Methods: []grpc.MethodDesc{ { @@ -206,26 +205,26 @@ var _HTTP_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: "httpgrpc.proto", + Metadata: "types/httpgrpc.proto", } -func init() { proto.RegisterFile("httpgrpc.proto", fileDescriptor0) } +func init() { proto.RegisterFile("types/httpgrpc.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 231 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4f, 0xc3, 0x30, - 0x10, 0x85, 0x49, 0x1d, 0x0c, 0xbd, 0x56, 0xa8, 0x3a, 0x89, 0xca, 0x62, 0x8a, 0x32, 0x45, 0x0c, - 0x1d, 0xc2, 0xc4, 0x88, 0x58, 0x32, 0x22, 0xab, 0x7f, 0x20, 0xc1, 0x27, 0x22, 0x11, 0x6a, 0x63, - 0x3b, 0xa0, 0xfe, 0x7b, 0x64, 0x3b, 0x85, 0x88, 0xa9, 0xdb, 0x7b, 0xe7, 0x27, 0x7f, 0xf7, 0x0e, - 0x6e, 0x7a, 0xef, 0xcd, 0x9b, 0x35, 0xaf, 0x3b, 0x63, 0xb5, 0xd7, 0x78, 0x7d, 0xf2, 0xe5, 0x37, - 0xac, 0x9a, 0xfd, 0xfe, 0x45, 0xd2, 0xe7, 0x48, 0xce, 0xe3, 0x16, 0xf8, 0x07, 0xf9, 0x5e, 0x2b, - 0x91, 0x15, 0x59, 0xb5, 0x94, 0x93, 0xc3, 0x0d, 0xb0, 0xd1, 0x0e, 0x62, 0x11, 0x87, 0x41, 0xe2, - 0x3d, 0x5c, 0xf5, 0xd4, 0x2a, 0xb2, 0x4e, 0xb0, 0x82, 0x55, 0xab, 0x7a, 0xb3, 0xfb, 0x85, 0x34, - 0xf1, 0x41, 0x9e, 0x02, 0x88, 0x90, 0x77, 0x5a, 0x1d, 0x45, 0x5e, 0x64, 0xd5, 0x5a, 0x46, 0x5d, - 0x76, 0xb0, 0x4e, 0x60, 0x67, 0xf4, 0xc1, 0x51, 0xc8, 0x3c, 0x6b, 0x45, 0x91, 0x7b, 0x29, 0xa3, - 0x9e, 0x33, 0x16, 0xe7, 0x32, 0xd8, 0x8c, 0x51, 0x03, 0x4f, 0xb1, 0xb0, 0xff, 0x3b, 0x1d, 0xa7, - 0x52, 0x41, 0x86, 0xa6, 0x5f, 0xed, 0x30, 0x52, 0xfa, 0x7a, 0x29, 0x27, 0x57, 0x3f, 0x41, 0x1e, - 0xf6, 0xc2, 0x47, 0xe0, 0x4d, 0x7b, 0x50, 0x03, 0xe1, 0xed, 0x0c, 0xfa, 0x77, 0xaa, 0xbb, 0xed, - 0xff, 0x71, 0x2a, 0x52, 0x5e, 0x74, 0x3c, 0x1e, 0xf9, 0xe1, 0x27, 0x00, 0x00, 0xff, 0xff, 0x47, - 0x4e, 0x55, 0x95, 0x76, 0x01, 0x00, 0x00, + // 234 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4b, 0xc4, 0x40, + 0x10, 0x85, 0xcd, 0x25, 0x17, 0xb9, 0xb9, 0x13, 0x64, 0x14, 0x59, 0xac, 0x42, 0x1a, 0x53, 0x45, + 0xc8, 0x95, 0x96, 0x36, 0x29, 0x65, 0xb9, 0x5e, 0x72, 0xee, 0x60, 0xc0, 0x78, 0xbb, 0xee, 0x6e, + 0x84, 0xfc, 0x7b, 0xd9, 0xd9, 0x1c, 0xc6, 0xce, 0xee, 0xcd, 0x9b, 0x61, 0xbe, 0x37, 0x03, 0xb7, + 0x7e, 0x32, 0xe4, 0x1e, 0x7b, 0xef, 0xcd, 0xbb, 0x35, 0x6f, 0xb5, 0xb1, 0xda, 0x6b, 0x5c, 0xb3, + 0x5b, 0x7a, 0xd8, 0xb6, 0x87, 0xc3, 0x8b, 0xa4, 0xaf, 0x91, 0x9c, 0xc7, 0x3b, 0xc8, 0x3f, 0xc9, + 0xf7, 0x5a, 0x89, 0xa4, 0x48, 0xaa, 0x8d, 0x9c, 0x2b, 0xbc, 0x86, 0x74, 0xb4, 0x83, 0x58, 0xb1, + 0x19, 0x24, 0x3e, 0xc0, 0x65, 0x4f, 0x9d, 0x22, 0xeb, 0x44, 0x5a, 0xa4, 0xd5, 0xb6, 0xb9, 0xaa, + 0x79, 0x63, 0xdd, 0xb2, 0x2b, 0xcf, 0x5d, 0x44, 0xc8, 0x8e, 0x5a, 0x4d, 0x22, 0x2b, 0x92, 0x6a, + 0x27, 0x59, 0x97, 0xaf, 0xb0, 0x8b, 0x54, 0x67, 0xf4, 0xc9, 0x51, 0x98, 0x79, 0xd6, 0x8a, 0x18, + 0xba, 0x96, 0xac, 0x97, 0x80, 0xd5, 0xbf, 0x00, 0xe9, 0x02, 0xd0, 0x40, 0x1e, 0xc7, 0x42, 0xf2, + 0x0f, 0x9a, 0xe6, 0x73, 0x82, 0x0c, 0x37, 0x7e, 0x77, 0xc3, 0x48, 0x71, 0xef, 0x46, 0xce, 0x55, + 0xf3, 0x04, 0x59, 0x08, 0x85, 0x7b, 0xc8, 0xdb, 0xee, 0xa4, 0x06, 0x42, 0x3c, 0x13, 0x7f, 0x3f, + 0x74, 0x7f, 0xf3, 0xc7, 0x8b, 0xf9, 0xcb, 0x8b, 0x63, 0xce, 0x5f, 0xdd, 0xff, 0x04, 0x00, 0x00, + 0xff, 0xff, 0xf5, 0x19, 0x53, 0x98, 0x6d, 0x01, 0x00, 0x00, } diff --git a/httpgrpc/httpgrpc.proto b/httpgrpc/types/httpgrpc.proto similarity index 95% rename from httpgrpc/httpgrpc.proto rename to httpgrpc/types/httpgrpc.proto index 45e91aab..e84a7ea2 100644 --- a/httpgrpc/httpgrpc.proto +++ b/httpgrpc/types/httpgrpc.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package httpgrpc; +package types; service HTTP { rpc Handle(HTTPRequest) returns (HTTPResponse) {}; diff --git a/httpgrpc/types/util.go b/httpgrpc/types/util.go new file mode 100644 index 00000000..e02143d6 --- /dev/null +++ b/httpgrpc/types/util.go @@ -0,0 +1,47 @@ +package types + +import ( + "fmt" + + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" + "github.com/prometheus/common/log" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/status" +) + +// ErrorFromHTTPResponse converts an HTTP response into a grpc error +func ErrorFromHTTPResponse(resp *HTTPResponse) error { + a, err := ptypes.MarshalAny(resp) + if err != nil { + return err + } + + return status.ErrorProto(&spb.Status{ + Code: resp.Code, + Message: string(resp.Body), + Details: []*any.Any{a}, + }) +} + +// HTTPResponseFromError converts a grpc error into an HTTP response +func HTTPResponseFromError(err error) (*HTTPResponse, bool) { + s, ok := status.FromError(err) + if !ok { + fmt.Println("not status") + return nil, false + } + + status := s.Proto() + if len(status.Details) != 1 { + return nil, false + } + + var resp HTTPResponse + if err := ptypes.UnmarshalAny(status.Details[0], &resp); err != nil { + log.Errorf("Got error containing non-response: %v", err) + return nil, false + } + + return &resp, true +} diff --git a/middleware/grpc_instrumentation.go b/middleware/grpc_instrumentation.go index 3d002be8..32d27571 100644 --- a/middleware/grpc_instrumentation.go +++ b/middleware/grpc_instrumentation.go @@ -1,23 +1,52 @@ package middleware import ( + "strconv" "time" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" + "google.golang.org/grpc/status" ) // ServerInstrumentInterceptor instruments gRPC requests for errors and latency. -func ServerInstrumentInterceptor(duration *prometheus.HistogramVec) grpc.UnaryServerInterceptor { +func ServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { begin := time.Now() resp, err := handler(ctx, req) - status := "success" + duration := time.Since(begin).Seconds() + respStatus := "success" if err != nil { - status = "error" + errInfo, ok := status.FromError(err) + if ok { + respStatus = strconv.Itoa(int(errInfo.Code())) + } else { + respStatus = "error" + } + } + hist.WithLabelValues(gRPC, info.FullMethod, respStatus, "false").Observe(duration) + return resp, err + } +} + +// ErrorToStatus handler to convert error objects to http-response errors +type ErrorToStatus func(error) (code int32, message string, err error) + +// ServerErrorToStatusInterceptor converts error objects to http-response-like error objects +func ServerErrorToStatusInterceptor(converter ErrorToStatus) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + resp, err := handler(ctx, req) + if err != nil { + code, message, convertError := converter(err) + if convertError == nil { + err = status.ErrorProto(&spb.Status{ + Code: code, + Message: message, + }) + } } - duration.WithLabelValues(gRPC, info.FullMethod, status, "false").Observe(time.Since(begin).Seconds()) return resp, err } } diff --git a/server/fake_server.pb.go b/server/fake_server.pb.go new file mode 100644 index 00000000..ab3563a0 --- /dev/null +++ b/server/fake_server.pb.go @@ -0,0 +1,210 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: fake_server.proto + +/* +Package server is a generated protocol buffer package. + +It is generated from these files: + fake_server.proto + +It has these top-level messages: + FailWithHTTPErrorRequest +*/ +package server + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/ptypes/empty" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type FailWithHTTPErrorRequest struct { + Code int32 `protobuf:"varint,1,opt,name=Code" json:"Code,omitempty"` +} + +func (m *FailWithHTTPErrorRequest) Reset() { *m = FailWithHTTPErrorRequest{} } +func (m *FailWithHTTPErrorRequest) String() string { return proto.CompactTextString(m) } +func (*FailWithHTTPErrorRequest) ProtoMessage() {} +func (*FailWithHTTPErrorRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *FailWithHTTPErrorRequest) GetCode() int32 { + if m != nil { + return m.Code + } + return 0 +} + +func init() { + proto.RegisterType((*FailWithHTTPErrorRequest)(nil), "server.FailWithHTTPErrorRequest") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for FakeServer service + +type FakeServerClient interface { + Succeed(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) + FailWithError(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) + FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) +} + +type fakeServerClient struct { + cc *grpc.ClientConn +} + +func NewFakeServerClient(cc *grpc.ClientConn) FakeServerClient { + return &fakeServerClient{cc} +} + +func (c *fakeServerClient) Succeed(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + out := new(google_protobuf.Empty) + err := grpc.Invoke(ctx, "/server.FakeServer/Succeed", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *fakeServerClient) FailWithError(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + out := new(google_protobuf.Empty) + err := grpc.Invoke(ctx, "/server.FakeServer/FailWithError", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + out := new(google_protobuf.Empty) + err := grpc.Invoke(ctx, "/server.FakeServer/FailWithHTTPError", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for FakeServer service + +type FakeServerServer interface { + Succeed(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error) + FailWithError(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error) + FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*google_protobuf.Empty, error) +} + +func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { + s.RegisterService(&_FakeServer_serviceDesc, srv) +} + +func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).Succeed(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/Succeed", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).Succeed(ctx, req.(*google_protobuf.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).FailWithError(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/FailWithError", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).FailWithError(ctx, req.(*google_protobuf.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _FakeServer_FailWithHTTPError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FailWithHTTPErrorRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).FailWithHTTPError(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/FailWithHTTPError", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).FailWithHTTPError(ctx, req.(*FailWithHTTPErrorRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _FakeServer_serviceDesc = grpc.ServiceDesc{ + ServiceName: "server.FakeServer", + HandlerType: (*FakeServerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Succeed", + Handler: _FakeServer_Succeed_Handler, + }, + { + MethodName: "FailWithError", + Handler: _FakeServer_FailWithError_Handler, + }, + { + MethodName: "FailWithHTTPError", + Handler: _FakeServer_FailWithHTTPError_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "fake_server.proto", +} + +func init() { proto.RegisterFile("fake_server.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 185 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x4b, 0xcc, 0x4e, + 0x8d, 0x2f, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x83, + 0xf0, 0xa4, 0xa4, 0xd3, 0xf3, 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0xc1, 0xa2, 0x49, 0xa5, 0x69, 0xfa, + 0xa9, 0xb9, 0x05, 0x25, 0x95, 0x10, 0x45, 0x4a, 0x7a, 0x5c, 0x12, 0x6e, 0x89, 0x99, 0x39, 0xe1, + 0x99, 0x25, 0x19, 0x1e, 0x21, 0x21, 0x01, 0xae, 0x45, 0x45, 0xf9, 0x45, 0x41, 0xa9, 0x85, 0xa5, + 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0xce, 0xf9, 0x29, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, + 0xac, 0x41, 0x60, 0xb6, 0xd1, 0x5d, 0x46, 0x2e, 0x2e, 0xb7, 0xc4, 0xec, 0xd4, 0x60, 0xb0, 0xd9, + 0x42, 0xd6, 0x5c, 0xec, 0xc1, 0xa5, 0xc9, 0xc9, 0xa9, 0xa9, 0x29, 0x42, 0x62, 0x7a, 0x10, 0x7b, + 0xf4, 0x60, 0xf6, 0xe8, 0xb9, 0x82, 0xec, 0x91, 0xc2, 0x21, 0xae, 0xc4, 0x20, 0xe4, 0xc8, 0xc5, + 0x0b, 0xb3, 0x1b, 0x6c, 0x2f, 0x19, 0x46, 0xf8, 0x73, 0x09, 0x62, 0x38, 0x5f, 0x48, 0x41, 0x0f, + 0x1a, 0x0e, 0xb8, 0x7c, 0x86, 0xdb, 0xc0, 0x24, 0x36, 0xb0, 0x88, 0x31, 0x20, 0x00, 0x00, 0xff, + 0xff, 0xdc, 0x61, 0xa8, 0xf0, 0x50, 0x01, 0x00, 0x00, +} diff --git a/server/fake_server.proto b/server/fake_server.proto new file mode 100644 index 00000000..d8cfb13e --- /dev/null +++ b/server/fake_server.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package server; + +import "google/protobuf/empty.proto"; + +service FakeServer { + rpc Succeed(google.protobuf.Empty) returns (google.protobuf.Empty) {}; + rpc FailWithError(google.protobuf.Empty) returns (google.protobuf.Empty) {}; + rpc FailWithHTTPError(FailWithHTTPErrorRequest) returns (google.protobuf.Empty) {}; +} + +message FailWithHTTPErrorRequest { + int32 Code = 1; +} diff --git a/server/server.go b/server/server.go index 5deb7b95..eab36e9a 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,7 @@ import ( "github.com/weaveworks-experiments/loki/pkg/client" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/types" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/signals" ) @@ -149,7 +150,7 @@ func (s *Server) Run() { // Setup gRPC server // for HTTP over gRPC, ensure we don't double-count the middleware - httpgrpc.RegisterHTTPServer(s.GRPC, httpgrpc.NewServer(s.HTTP)) + types.RegisterHTTPServer(s.GRPC, httpgrpc.NewServer(s.HTTP)) go s.GRPC.Serve(s.grpcListener) defer s.GRPC.GracefulStop() diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 00000000..06b25b22 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,110 @@ +package server + +import ( + "errors" + "fmt" + "net/http" + "strconv" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/status" + + google_protobuf "github.com/golang/protobuf/ptypes/empty" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "golang.org/x/net/context" + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +type FakeServer struct{} + +func (f FakeServer) FailWithError(ctx context.Context, req *google_protobuf.Empty) (*google_protobuf.Empty, error) { + return nil, errors.New("test error") +} + +func (f FakeServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*google_protobuf.Empty, error) { + return nil, errors.New(strconv.Itoa(int(req.Code))) +} + +func (f FakeServer) Succeed(ctx context.Context, req *google_protobuf.Empty) (*google_protobuf.Empty, error) { + return &google_protobuf.Empty{}, nil +} + +// errorToStatus test converter to allow any type of http status to be raised +func errorToStatus(err error) (int32, string, error) { + msg := err.Error() + fmt.Println(msg) + code, codeErr := strconv.Atoi(msg) + if codeErr != nil { + return 0, "", codeErr + } + return int32(code), msg, nil +} + +func TestErrorMiddleware(t *testing.T) { + errorInterceptor := middleware.ServerErrorToStatusInterceptor(errorToStatus) + cfg := Config{ + GRPCListenPort: 1234, + GRPCMiddleware: []grpc.UnaryServerInterceptor{errorInterceptor}, + } + server, err := New(cfg) + require.NoError(t, err) + + fakeServer := FakeServer{} + RegisterFakeServerServer(server.GRPC, fakeServer) + + go server.Run() + defer server.Shutdown() + + conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) + defer conn.Close() + require.NoError(t, err) + + empty := google_protobuf.Empty{} + client := NewFakeServerClient(conn) + res, err := client.Succeed(context.Background(), &empty) + require.NoError(t, err) + require.EqualValues(t, &empty, res) + + res, err = client.FailWithError(context.Background(), &empty) + require.Nil(t, res) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, "test error", s.Message()) + + res, err = client.FailWithHTTPError(context.Background(), &FailWithHTTPErrorRequest{Code: http.StatusPaymentRequired}) + require.Nil(t, res) + require.Error(t, err) + require.Equal(t, status.ErrorProto(&spb.Status{Code: 402, Message: "402"}), err) + + conn.Close() + + metrics, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + + statuses := map[string]string{} + for _, family := range metrics { + if *family.Name == "request_duration_seconds" { + for _, metric := range family.Metric { + var route, statusCode string + for _, label := range metric.GetLabel() { + switch label.GetName() { + case "status_code": + statusCode = label.GetValue() + case "route": + route = label.GetValue() + } + } + statuses[route] = statusCode + } + } + } + require.Equal(t, map[string]string{ + "/server.FakeServer/FailWithError": "error", + "/server.FakeServer/FailWithHTTPError": "402", + "/server.FakeServer/Succeed": "success", + }, statuses) +} From 78dea72b0ba49f07272e29661bbf26f57189c629 Mon Sep 17 00:00:00 2001 From: Marcus Cobden Date: Tue, 30 May 2017 13:37:31 +0100 Subject: [PATCH 2/4] Break out server instead of types One thing must be broken out to prevent circular imports, this seemed nicer. --- httpgrpc/README.md | 2 +- httpgrpc/httpgrpc.go | 187 +++--------------- httpgrpc/{types => }/httpgrpc.pb.go | 56 +++--- httpgrpc/{types => }/httpgrpc.proto | 2 +- httpgrpc/server/server.go | 178 +++++++++++++++++ .../server_test.go} | 6 +- httpgrpc/types/util.go | 47 ----- server/server.go | 4 +- 8 files changed, 241 insertions(+), 241 deletions(-) rename httpgrpc/{types => }/httpgrpc.pb.go (72%) rename httpgrpc/{types => }/httpgrpc.proto (95%) create mode 100644 httpgrpc/server/server.go rename httpgrpc/{httpgrpc_test.go => server/server_test.go} (94%) delete mode 100644 httpgrpc/types/util.go diff --git a/httpgrpc/README.md b/httpgrpc/README.md index 1f3881d9..cc5b2ac2 100644 --- a/httpgrpc/README.md +++ b/httpgrpc/README.md @@ -4,6 +4,6 @@ To rebuild generated protobuf code, run: - protoc -I ./ --go_out=plugins=grpc:./ ./types/httpgrpc.proto + protoc -I ./ --go_out=plugins=grpc:./ ./httpgrpc.proto Follow the instructions here to get a working protoc: https://github.com/golang/protobuf diff --git a/httpgrpc/httpgrpc.go b/httpgrpc/httpgrpc.go index 98742cd8..a506de95 100644 --- a/httpgrpc/httpgrpc.go +++ b/httpgrpc/httpgrpc.go @@ -1,178 +1,47 @@ package httpgrpc import ( - "bytes" "fmt" - "io/ioutil" - "net" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "sync" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" - "github.com/mwitkow/go-grpc-middleware" - "github.com/opentracing/opentracing-go" - "github.com/sercand/kuberesolver" - "golang.org/x/net/context" - "google.golang.org/grpc" - - "github.com/weaveworks/common/httpgrpc/types" - "github.com/weaveworks/common/middleware" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" + "github.com/prometheus/common/log" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/status" ) -// Server implements HTTPServer. HTTPServer is a generated interface that gRPC -// servers must implement. -type Server struct { - handler http.Handler -} - -// NewServer makes a new Server. -func NewServer(handler http.Handler) *Server { - return &Server{ - handler: handler, - } -} - -// Handle implements HTTPServer. -func (s Server) Handle(ctx context.Context, r *types.HTTPRequest) (*types.HTTPResponse, error) { - req, err := http.NewRequest(r.Method, r.Url, ioutil.NopCloser(bytes.NewReader(r.Body))) +// ErrorFromHTTPResponse converts an HTTP response into a grpc error +func ErrorFromHTTPResponse(resp *HTTPResponse) error { + a, err := ptypes.MarshalAny(resp) if err != nil { - return nil, err - } - req = req.WithContext(ctx) - toHeader(r.Headers, req.Header) - req.RequestURI = r.Url - recorder := httptest.NewRecorder() - s.handler.ServeHTTP(recorder, req) - resp := &types.HTTPResponse{ - Code: int32(recorder.Code), - Headers: fromHeader(recorder.Header()), - Body: recorder.Body.Bytes(), + return err } - if recorder.Code/100 == 5 { - return nil, types.ErrorFromHTTPResponse(resp) - } - return resp, err -} -// Client is a http.Handler that forwards the request over gRPC. -type Client struct { - mtx sync.RWMutex - service string - namespace string - port string - client types.HTTPClient - conn *grpc.ClientConn + return status.ErrorProto(&spb.Status{ + Code: resp.Code, + Message: string(resp.Body), + Details: []*any.Any{a}, + }) } -// ParseURL deals with direct:// style URLs, as well as kubernetes:// urls. -// For backwards compatibility it treats URLs without schems as kubernetes://. -func ParseURL(unparsed string) (string, []grpc.DialOption, error) { - parsed, err := url.Parse(unparsed) - if err != nil { - return "", nil, err +// HTTPResponseFromError converts a grpc error into an HTTP response +func HTTPResponseFromError(err error) (*HTTPResponse, bool) { + s, ok := status.FromError(err) + if !ok { + fmt.Printf("not status, %v\n", err) + return nil, false } - switch parsed.Scheme { - case "direct": - return parsed.Host, nil, err - - case "kubernetes", "": - host, port, err := net.SplitHostPort(parsed.Host) - if err != nil { - return "", nil, err - } - parts := strings.SplitN(host, ".", 2) - service, namespace := parts[0], "default" - if len(parts) == 2 { - namespace = parts[1] - } - balancer := kuberesolver.NewWithNamespace(namespace) - address := fmt.Sprintf("kubernetes://%s:%s", service, port) - dialOptions := []grpc.DialOption{balancer.DialOption()} - return address, dialOptions, nil - - default: - return "", nil, fmt.Errorf("unrecognised scheme: %s", parsed.Scheme) + status := s.Proto() + if len(status.Details) != 1 { + return nil, false } -} -// NewClient makes a new Client, given a kubernetes service address. -func NewClient(address string) (*Client, error) { - address, dialOptions, err := ParseURL(address) - if err != nil { - return nil, err - } - - dialOptions = append( - dialOptions, - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - )), - ) - - conn, err := grpc.Dial(address, dialOptions...) - if err != nil { - return nil, err + var resp HTTPResponse + if err := ptypes.UnmarshalAny(status.Details[0], &resp); err != nil { + log.Errorf("Got error containing non-response: %v", err) + return nil, false } - return &Client{ - client: types.NewHTTPClient(conn), - conn: conn, - }, nil -} - -// ServeHTTP implements http.Handler -func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - req := &types.HTTPRequest{ - Method: r.Method, - Url: r.RequestURI, - Body: body, - Headers: fromHeader(r.Header), - } - - resp, err := c.client.Handle(r.Context(), req) - if err != nil { - // Some errors will actually contain a valid resp, just need to unpack it - var ok bool - resp, ok = types.HTTPResponseFromError(err) - - if !ok { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - toHeader(resp.Headers, w.Header()) - w.WriteHeader(int(resp.Code)) - if _, err := w.Write(resp.Body); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func toHeader(hs []*types.Header, header http.Header) { - for _, h := range hs { - header[h.Key] = h.Values - } -} - -func fromHeader(hs http.Header) []*types.Header { - result := make([]*types.Header, 0, len(hs)) - for k, vs := range hs { - result = append(result, &types.Header{ - Key: k, - Values: vs, - }) - } - return result + return &resp, true } diff --git a/httpgrpc/types/httpgrpc.pb.go b/httpgrpc/httpgrpc.pb.go similarity index 72% rename from httpgrpc/types/httpgrpc.pb.go rename to httpgrpc/httpgrpc.pb.go index 5f583170..8b498ced 100644 --- a/httpgrpc/types/httpgrpc.pb.go +++ b/httpgrpc/httpgrpc.pb.go @@ -1,18 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: types/httpgrpc.proto +// source: httpgrpc.proto /* -Package types is a generated protocol buffer package. +Package httpgrpc is a generated protocol buffer package. It is generated from these files: - types/httpgrpc.proto + httpgrpc.proto It has these top-level messages: HTTPRequest HTTPResponse Header */ -package types +package httpgrpc import proto "github.com/golang/protobuf/proto" import fmt "fmt" @@ -131,9 +131,9 @@ func (m *Header) GetValues() []string { } func init() { - proto.RegisterType((*HTTPRequest)(nil), "types.HTTPRequest") - proto.RegisterType((*HTTPResponse)(nil), "types.HTTPResponse") - proto.RegisterType((*Header)(nil), "types.Header") + proto.RegisterType((*HTTPRequest)(nil), "httpgrpc.HTTPRequest") + proto.RegisterType((*HTTPResponse)(nil), "httpgrpc.HTTPResponse") + proto.RegisterType((*Header)(nil), "httpgrpc.Header") } // Reference imports to suppress errors if they are not otherwise used. @@ -160,7 +160,7 @@ func NewHTTPClient(cc *grpc.ClientConn) HTTPClient { func (c *hTTPClient) Handle(ctx context.Context, in *HTTPRequest, opts ...grpc.CallOption) (*HTTPResponse, error) { out := new(HTTPResponse) - err := grpc.Invoke(ctx, "/types.HTTP/Handle", in, out, c.cc, opts...) + err := grpc.Invoke(ctx, "/httpgrpc.HTTP/Handle", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -187,7 +187,7 @@ func _HTTP_Handle_Handler(srv interface{}, ctx context.Context, dec func(interfa } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/types.HTTP/Handle", + FullMethod: "/httpgrpc.HTTP/Handle", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(HTTPServer).Handle(ctx, req.(*HTTPRequest)) @@ -196,7 +196,7 @@ func _HTTP_Handle_Handler(srv interface{}, ctx context.Context, dec func(interfa } var _HTTP_serviceDesc = grpc.ServiceDesc{ - ServiceName: "types.HTTP", + ServiceName: "httpgrpc.HTTP", HandlerType: (*HTTPServer)(nil), Methods: []grpc.MethodDesc{ { @@ -205,26 +205,26 @@ var _HTTP_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: "types/httpgrpc.proto", + Metadata: "httpgrpc.proto", } -func init() { proto.RegisterFile("types/httpgrpc.proto", fileDescriptor0) } +func init() { proto.RegisterFile("httpgrpc.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 234 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4b, 0xc4, 0x40, - 0x10, 0x85, 0xcd, 0x25, 0x17, 0xb9, 0xb9, 0x13, 0x64, 0x14, 0x59, 0xac, 0x42, 0x1a, 0x53, 0x45, - 0xc8, 0x95, 0x96, 0x36, 0x29, 0x65, 0xb9, 0x5e, 0x72, 0xee, 0x60, 0xc0, 0x78, 0xbb, 0xee, 0x6e, - 0x84, 0xfc, 0x7b, 0xd9, 0xd9, 0x1c, 0xc6, 0xce, 0xee, 0xcd, 0x9b, 0x61, 0xbe, 0x37, 0x03, 0xb7, - 0x7e, 0x32, 0xe4, 0x1e, 0x7b, 0xef, 0xcd, 0xbb, 0x35, 0x6f, 0xb5, 0xb1, 0xda, 0x6b, 0x5c, 0xb3, - 0x5b, 0x7a, 0xd8, 0xb6, 0x87, 0xc3, 0x8b, 0xa4, 0xaf, 0x91, 0x9c, 0xc7, 0x3b, 0xc8, 0x3f, 0xc9, - 0xf7, 0x5a, 0x89, 0xa4, 0x48, 0xaa, 0x8d, 0x9c, 0x2b, 0xbc, 0x86, 0x74, 0xb4, 0x83, 0x58, 0xb1, - 0x19, 0x24, 0x3e, 0xc0, 0x65, 0x4f, 0x9d, 0x22, 0xeb, 0x44, 0x5a, 0xa4, 0xd5, 0xb6, 0xb9, 0xaa, - 0x79, 0x63, 0xdd, 0xb2, 0x2b, 0xcf, 0x5d, 0x44, 0xc8, 0x8e, 0x5a, 0x4d, 0x22, 0x2b, 0x92, 0x6a, - 0x27, 0x59, 0x97, 0xaf, 0xb0, 0x8b, 0x54, 0x67, 0xf4, 0xc9, 0x51, 0x98, 0x79, 0xd6, 0x8a, 0x18, - 0xba, 0x96, 0xac, 0x97, 0x80, 0xd5, 0xbf, 0x00, 0xe9, 0x02, 0xd0, 0x40, 0x1e, 0xc7, 0x42, 0xf2, - 0x0f, 0x9a, 0xe6, 0x73, 0x82, 0x0c, 0x37, 0x7e, 0x77, 0xc3, 0x48, 0x71, 0xef, 0x46, 0xce, 0x55, - 0xf3, 0x04, 0x59, 0x08, 0x85, 0x7b, 0xc8, 0xdb, 0xee, 0xa4, 0x06, 0x42, 0x3c, 0x13, 0x7f, 0x3f, - 0x74, 0x7f, 0xf3, 0xc7, 0x8b, 0xf9, 0xcb, 0x8b, 0x63, 0xce, 0x5f, 0xdd, 0xff, 0x04, 0x00, 0x00, - 0xff, 0xff, 0xf5, 0x19, 0x53, 0x98, 0x6d, 0x01, 0x00, 0x00, + // 231 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4f, 0xc3, 0x30, + 0x10, 0x85, 0x49, 0x1d, 0x0c, 0xbd, 0x56, 0xa8, 0x3a, 0x89, 0xca, 0x62, 0x8a, 0x32, 0x45, 0x0c, + 0x1d, 0xc2, 0xc4, 0x88, 0x58, 0x32, 0x22, 0xab, 0x7f, 0x20, 0xc1, 0x27, 0x22, 0x11, 0x6a, 0x63, + 0x3b, 0xa0, 0xfe, 0x7b, 0x64, 0x3b, 0x85, 0x88, 0xa9, 0xdb, 0x7b, 0xe7, 0x27, 0x7f, 0xf7, 0x0e, + 0x6e, 0x7a, 0xef, 0xcd, 0x9b, 0x35, 0xaf, 0x3b, 0x63, 0xb5, 0xd7, 0x78, 0x7d, 0xf2, 0xe5, 0x37, + 0xac, 0x9a, 0xfd, 0xfe, 0x45, 0xd2, 0xe7, 0x48, 0xce, 0xe3, 0x16, 0xf8, 0x07, 0xf9, 0x5e, 0x2b, + 0x91, 0x15, 0x59, 0xb5, 0x94, 0x93, 0xc3, 0x0d, 0xb0, 0xd1, 0x0e, 0x62, 0x11, 0x87, 0x41, 0xe2, + 0x3d, 0x5c, 0xf5, 0xd4, 0x2a, 0xb2, 0x4e, 0xb0, 0x82, 0x55, 0xab, 0x7a, 0xb3, 0xfb, 0x85, 0x34, + 0xf1, 0x41, 0x9e, 0x02, 0x88, 0x90, 0x77, 0x5a, 0x1d, 0x45, 0x5e, 0x64, 0xd5, 0x5a, 0x46, 0x5d, + 0x76, 0xb0, 0x4e, 0x60, 0x67, 0xf4, 0xc1, 0x51, 0xc8, 0x3c, 0x6b, 0x45, 0x91, 0x7b, 0x29, 0xa3, + 0x9e, 0x33, 0x16, 0xe7, 0x32, 0xd8, 0x8c, 0x51, 0x03, 0x4f, 0xb1, 0xb0, 0xff, 0x3b, 0x1d, 0xa7, + 0x52, 0x41, 0x86, 0xa6, 0x5f, 0xed, 0x30, 0x52, 0xfa, 0x7a, 0x29, 0x27, 0x57, 0x3f, 0x41, 0x1e, + 0xf6, 0xc2, 0x47, 0xe0, 0x4d, 0x7b, 0x50, 0x03, 0xe1, 0xed, 0x0c, 0xfa, 0x77, 0xaa, 0xbb, 0xed, + 0xff, 0x71, 0x2a, 0x52, 0x5e, 0x74, 0x3c, 0x1e, 0xf9, 0xe1, 0x27, 0x00, 0x00, 0xff, 0xff, 0x47, + 0x4e, 0x55, 0x95, 0x76, 0x01, 0x00, 0x00, } diff --git a/httpgrpc/types/httpgrpc.proto b/httpgrpc/httpgrpc.proto similarity index 95% rename from httpgrpc/types/httpgrpc.proto rename to httpgrpc/httpgrpc.proto index e84a7ea2..45e91aab 100644 --- a/httpgrpc/types/httpgrpc.proto +++ b/httpgrpc/httpgrpc.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package types; +package httpgrpc; service HTTP { rpc Handle(HTTPRequest) returns (HTTPResponse) {}; diff --git a/httpgrpc/server/server.go b/httpgrpc/server/server.go new file mode 100644 index 00000000..51ce183e --- /dev/null +++ b/httpgrpc/server/server.go @@ -0,0 +1,178 @@ +package server + +import ( + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/mwitkow/go-grpc-middleware" + "github.com/opentracing/opentracing-go" + "github.com/sercand/kuberesolver" + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" +) + +// Server implements HTTPServer. HTTPServer is a generated interface that gRPC +// servers must implement. +type Server struct { + handler http.Handler +} + +// NewServer makes a new Server. +func NewServer(handler http.Handler) *Server { + return &Server{ + handler: handler, + } +} + +// Handle implements HTTPServer. +func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + req, err := http.NewRequest(r.Method, r.Url, ioutil.NopCloser(bytes.NewReader(r.Body))) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + toHeader(r.Headers, req.Header) + req.RequestURI = r.Url + recorder := httptest.NewRecorder() + s.handler.ServeHTTP(recorder, req) + resp := &httpgrpc.HTTPResponse{ + Code: int32(recorder.Code), + Headers: fromHeader(recorder.Header()), + Body: recorder.Body.Bytes(), + } + if recorder.Code/100 == 5 { + return nil, httpgrpc.ErrorFromHTTPResponse(resp) + } + return resp, err +} + +// Client is a http.Handler that forwards the request over gRPC. +type Client struct { + mtx sync.RWMutex + service string + namespace string + port string + client httpgrpc.HTTPClient + conn *grpc.ClientConn +} + +// ParseURL deals with direct:// style URLs, as well as kubernetes:// urls. +// For backwards compatibility it treats URLs without schems as kubernetes://. +func ParseURL(unparsed string) (string, []grpc.DialOption, error) { + parsed, err := url.Parse(unparsed) + if err != nil { + return "", nil, err + } + + switch parsed.Scheme { + case "direct": + return parsed.Host, nil, err + + case "kubernetes", "": + host, port, err := net.SplitHostPort(parsed.Host) + if err != nil { + return "", nil, err + } + parts := strings.SplitN(host, ".", 2) + service, namespace := parts[0], "default" + if len(parts) == 2 { + namespace = parts[1] + } + balancer := kuberesolver.NewWithNamespace(namespace) + address := fmt.Sprintf("kubernetes://%s:%s", service, port) + dialOptions := []grpc.DialOption{balancer.DialOption()} + return address, dialOptions, nil + + default: + return "", nil, fmt.Errorf("unrecognised scheme: %s", parsed.Scheme) + } +} + +// NewClient makes a new Client, given a kubernetes service address. +func NewClient(address string) (*Client, error) { + address, dialOptions, err := ParseURL(address) + if err != nil { + return nil, err + } + + dialOptions = append( + dialOptions, + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + + conn, err := grpc.Dial(address, dialOptions...) + if err != nil { + return nil, err + } + + return &Client{ + client: httpgrpc.NewHTTPClient(conn), + conn: conn, + }, nil +} + +// ServeHTTP implements http.Handler +func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + req := &httpgrpc.HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: fromHeader(r.Header), + } + + resp, err := c.client.Handle(r.Context(), req) + if err != nil { + // Some errors will actually contain a valid resp, just need to unpack it + var ok bool + resp, ok = httpgrpc.HTTPResponseFromError(err) + + if !ok { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + toHeader(resp.Headers, w.Header()) + w.WriteHeader(int(resp.Code)) + if _, err := w.Write(resp.Body); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func toHeader(hs []*httpgrpc.Header, header http.Header) { + for _, h := range hs { + header[h.Key] = h.Values + } +} + +func fromHeader(hs http.Header) []*httpgrpc.Header { + result := make([]*httpgrpc.Header, 0, len(hs)) + for k, vs := range hs { + result = append(result, &httpgrpc.Header{ + Key: k, + Values: vs, + }) + } + return result +} diff --git a/httpgrpc/httpgrpc_test.go b/httpgrpc/server/server_test.go similarity index 94% rename from httpgrpc/httpgrpc_test.go rename to httpgrpc/server/server_test.go index 9e3b298b..b497ac8a 100644 --- a/httpgrpc/httpgrpc_test.go +++ b/httpgrpc/server/server_test.go @@ -1,4 +1,4 @@ -package httpgrpc +package server import ( "bytes" @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/httpgrpc/types" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "google.golang.org/grpc" ) @@ -34,7 +34,7 @@ func newTestServer(handler http.Handler) (*testServer, error) { URL: "direct://" + lis.Addr().String(), } - types.RegisterHTTPServer(server.grpcServer, server.Server) + httpgrpc.RegisterHTTPServer(server.grpcServer, server.Server) go server.grpcServer.Serve(lis) return server, nil diff --git a/httpgrpc/types/util.go b/httpgrpc/types/util.go deleted file mode 100644 index e02143d6..00000000 --- a/httpgrpc/types/util.go +++ /dev/null @@ -1,47 +0,0 @@ -package types - -import ( - "fmt" - - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/any" - "github.com/prometheus/common/log" - spb "google.golang.org/genproto/googleapis/rpc/status" - "google.golang.org/grpc/status" -) - -// ErrorFromHTTPResponse converts an HTTP response into a grpc error -func ErrorFromHTTPResponse(resp *HTTPResponse) error { - a, err := ptypes.MarshalAny(resp) - if err != nil { - return err - } - - return status.ErrorProto(&spb.Status{ - Code: resp.Code, - Message: string(resp.Body), - Details: []*any.Any{a}, - }) -} - -// HTTPResponseFromError converts a grpc error into an HTTP response -func HTTPResponseFromError(err error) (*HTTPResponse, bool) { - s, ok := status.FromError(err) - if !ok { - fmt.Println("not status") - return nil, false - } - - status := s.Proto() - if len(status.Details) != 1 { - return nil, false - } - - var resp HTTPResponse - if err := ptypes.UnmarshalAny(status.Details[0], &resp); err != nil { - log.Errorf("Got error containing non-response: %v", err) - return nil, false - } - - return &resp, true -} diff --git a/server/server.go b/server/server.go index eab36e9a..6de17b45 100644 --- a/server/server.go +++ b/server/server.go @@ -20,7 +20,7 @@ import ( "github.com/weaveworks-experiments/loki/pkg/client" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/httpgrpc/types" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/signals" ) @@ -150,7 +150,7 @@ func (s *Server) Run() { // Setup gRPC server // for HTTP over gRPC, ensure we don't double-count the middleware - types.RegisterHTTPServer(s.GRPC, httpgrpc.NewServer(s.HTTP)) + httpgrpc.RegisterHTTPServer(s.GRPC, httpgrpc_server.NewServer(s.HTTP)) go s.GRPC.Serve(s.grpcListener) defer s.GRPC.GracefulStop() From f60d0ad630bff0e65e6d1095e8958e278f0cf9f8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 31 May 2017 11:02:37 +0100 Subject: [PATCH 3/4] Add Errorf helper --- httpgrpc/httpgrpc.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/httpgrpc/httpgrpc.go b/httpgrpc/httpgrpc.go index 6aa7eb64..ced3560c 100644 --- a/httpgrpc/httpgrpc.go +++ b/httpgrpc/httpgrpc.go @@ -10,6 +10,16 @@ import ( "google.golang.org/grpc/status" ) +// Errorf returns a HTTP gRPC error than is correctly forwarded over +// gRPC, and can eventually be converted back to a HTTP response with +// HTTPResponseFromError. +func Errorf(code int, tmpl string, args ...interface{}) error { + return ErrorFromHTTPResponse(&HTTPResponse{ + Code: int32(code), + Body: []byte(fmt.Sprintf(tmpl, args...)), + }) +} + // ErrorFromHTTPResponse converts an HTTP response into a grpc error func ErrorFromHTTPResponse(resp *HTTPResponse) error { a, err := ptypes.MarshalAny(resp) From 049831ebb99db6ece497a6af08b3d006cfddef80 Mon Sep 17 00:00:00 2001 From: Marcus Cobden Date: Wed, 31 May 2017 11:41:33 +0100 Subject: [PATCH 4/4] Use httpgrpc.Errorf instead of middleware --- httpgrpc/httpgrpc.go | 1 - middleware/grpc_instrumentation.go | 28 +++----------------------- server/server_test.go | 32 +++++++++--------------------- 3 files changed, 12 insertions(+), 49 deletions(-) diff --git a/httpgrpc/httpgrpc.go b/httpgrpc/httpgrpc.go index ced3560c..41600c2d 100644 --- a/httpgrpc/httpgrpc.go +++ b/httpgrpc/httpgrpc.go @@ -38,7 +38,6 @@ func ErrorFromHTTPResponse(resp *HTTPResponse) error { func HTTPResponseFromError(err error) (*HTTPResponse, bool) { s, ok := status.FromError(err) if !ok { - fmt.Printf("not status, %v\n", err) return nil, false } diff --git a/middleware/grpc_instrumentation.go b/middleware/grpc_instrumentation.go index 32d27571..06144334 100644 --- a/middleware/grpc_instrumentation.go +++ b/middleware/grpc_instrumentation.go @@ -5,10 +5,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/httpgrpc" "golang.org/x/net/context" - spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" - "google.golang.org/grpc/status" ) // ServerInstrumentInterceptor instruments gRPC requests for errors and latency. @@ -19,9 +18,8 @@ func ServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServer duration := time.Since(begin).Seconds() respStatus := "success" if err != nil { - errInfo, ok := status.FromError(err) - if ok { - respStatus = strconv.Itoa(int(errInfo.Code())) + if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + respStatus = strconv.Itoa(int(errResp.Code)) } else { respStatus = "error" } @@ -30,23 +28,3 @@ func ServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServer return resp, err } } - -// ErrorToStatus handler to convert error objects to http-response errors -type ErrorToStatus func(error) (code int32, message string, err error) - -// ServerErrorToStatusInterceptor converts error objects to http-response-like error objects -func ServerErrorToStatusInterceptor(converter ErrorToStatus) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - resp, err := handler(ctx, req) - if err != nil { - code, message, convertError := converter(err) - if convertError == nil { - err = status.ErrorProto(&spb.Status{ - Code: code, - Message: message, - }) - } - } - return resp, err - } -} diff --git a/server/server_test.go b/server/server_test.go index 06b25b22..c5428f33 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -2,7 +2,6 @@ package server import ( "errors" - "fmt" "net/http" "strconv" "testing" @@ -13,9 +12,8 @@ import ( google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/httpgrpc" "golang.org/x/net/context" - spb "google.golang.org/genproto/googleapis/rpc/status" ) type FakeServer struct{} @@ -25,30 +23,15 @@ func (f FakeServer) FailWithError(ctx context.Context, req *google_protobuf.Empt } func (f FakeServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*google_protobuf.Empty, error) { - return nil, errors.New(strconv.Itoa(int(req.Code))) + return nil, httpgrpc.Errorf(int(req.Code), strconv.Itoa(int(req.Code))) } func (f FakeServer) Succeed(ctx context.Context, req *google_protobuf.Empty) (*google_protobuf.Empty, error) { return &google_protobuf.Empty{}, nil } -// errorToStatus test converter to allow any type of http status to be raised -func errorToStatus(err error) (int32, string, error) { - msg := err.Error() - fmt.Println(msg) - code, codeErr := strconv.Atoi(msg) - if codeErr != nil { - return 0, "", codeErr - } - return int32(code), msg, nil -} - -func TestErrorMiddleware(t *testing.T) { - errorInterceptor := middleware.ServerErrorToStatusInterceptor(errorToStatus) - cfg := Config{ - GRPCListenPort: 1234, - GRPCMiddleware: []grpc.UnaryServerInterceptor{errorInterceptor}, - } +func TestErrorInstrumentationMiddleware(t *testing.T) { + cfg := Config{GRPCListenPort: 1234} server, err := New(cfg) require.NoError(t, err) @@ -71,14 +54,17 @@ func TestErrorMiddleware(t *testing.T) { res, err = client.FailWithError(context.Background(), &empty) require.Nil(t, res) require.Error(t, err) + s, ok := status.FromError(err) require.True(t, ok) require.Equal(t, "test error", s.Message()) res, err = client.FailWithHTTPError(context.Background(), &FailWithHTTPErrorRequest{Code: http.StatusPaymentRequired}) require.Nil(t, res) - require.Error(t, err) - require.Equal(t, status.ErrorProto(&spb.Status{Code: 402, Message: "402"}), err) + errResp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok) + require.Equal(t, int32(http.StatusPaymentRequired), errResp.Code) + require.Equal(t, "402", string(errResp.Body)) conn.Close()