diff --git a/httpgrpc/README.md b/httpgrpc/README.md index 3cb4a69f..cc5b2ac2 100644 --- a/httpgrpc/README.md +++ b/httpgrpc/README.md @@ -6,4 +6,4 @@ To rebuild generated protobuf code, run: protoc -I ./ --go_out=plugins=grpc:./ ./httpgrpc.proto -Follow the instructions here to get a working protoc: https://github.com/golang/protobuf \ No newline at end of file +Follow the instructions here to get a working protoc: https://github.com/golang/protobuf diff --git a/httpgrpc/httpgrpc.go b/httpgrpc/httpgrpc.go index ad14eebb..41600c2d 100644 --- a/httpgrpc/httpgrpc.go +++ b/httpgrpc/httpgrpc.go @@ -1,175 +1,27 @@ package httpgrpc import ( - "bytes" "fmt" - "io/ioutil" - "net" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "sync" log "github.com/Sirupsen/logrus" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" - "github.com/mwitkow/go-grpc-middleware" - "github.com/opentracing/opentracing-go" - "github.com/sercand/kuberesolver" - "golang.org/x/net/context" spb "google.golang.org/genproto/googleapis/rpc/status" - "google.golang.org/grpc" "google.golang.org/grpc/status" - - "github.com/weaveworks/common/middleware" ) -// Server implements HTTPServer. HTTPServer is a generated interface that gRPC -// servers must implement. -type Server struct { - handler http.Handler -} - -// NewServer makes a new Server. -func NewServer(handler http.Handler) *Server { - return &Server{ - handler: handler, - } -} - -// Handle implements HTTPServer. -func (s Server) Handle(ctx context.Context, r *HTTPRequest) (*HTTPResponse, error) { - req, err := http.NewRequest(r.Method, r.Url, ioutil.NopCloser(bytes.NewReader(r.Body))) - if err != nil { - return nil, err - } - req = req.WithContext(ctx) - toHeader(r.Headers, req.Header) - req.RequestURI = r.Url - recorder := httptest.NewRecorder() - s.handler.ServeHTTP(recorder, req) - resp := &HTTPResponse{ - Code: int32(recorder.Code), - Headers: fromHeader(recorder.Header()), - Body: recorder.Body.Bytes(), - } - if recorder.Code/100 == 5 { - return nil, errorFromHTTPResponse(resp) - } - return resp, err -} - -// Client is a http.Handler that forwards the request over gRPC. -type Client struct { - mtx sync.RWMutex - service string - namespace string - port string - client HTTPClient - conn *grpc.ClientConn -} - -// ParseURL deals with direct:// style URLs, as well as kubernetes:// urls. -// For backwards compatibility it treats URLs without schems as kubernetes://. -func ParseURL(unparsed string) (string, []grpc.DialOption, error) { - parsed, err := url.Parse(unparsed) - if err != nil { - return "", nil, err - } - - scheme, host := parsed.Scheme, parsed.Host - if !strings.Contains(unparsed, "://") { - scheme, host = "kubernetes", unparsed - } - - switch scheme { - case "direct": - return host, nil, err - - case "kubernetes": - host, port, err := net.SplitHostPort(host) - if err != nil { - return "", nil, err - } - parts := strings.SplitN(host, ".", 2) - service, namespace := parts[0], "default" - if len(parts) == 2 { - namespace = parts[1] - } - balancer := kuberesolver.NewWithNamespace(namespace) - address := fmt.Sprintf("kubernetes://%s:%s", service, port) - dialOptions := []grpc.DialOption{balancer.DialOption()} - return address, dialOptions, nil - - default: - return "", nil, fmt.Errorf("unrecognised scheme: %s", parsed.Scheme) - } -} - -// NewClient makes a new Client, given a kubernetes service address. -func NewClient(address string) (*Client, error) { - address, dialOptions, err := ParseURL(address) - if err != nil { - return nil, err - } - - dialOptions = append( - dialOptions, - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - )), - ) - - conn, err := grpc.Dial(address, dialOptions...) - if err != nil { - return nil, err - } - - return &Client{ - client: NewHTTPClient(conn), - conn: conn, - }, nil -} - -// ServeHTTP implements http.Handler -func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - req := &HTTPRequest{ - Method: r.Method, - Url: r.RequestURI, - Body: body, - Headers: fromHeader(r.Header), - } - - resp, err := c.client.Handle(r.Context(), req) - if err != nil { - // Some errors will actually contain a valid resp, just need to unpack it - var ok bool - resp, ok = httpResponseFromError(err) - - if !ok { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - toHeader(resp.Headers, w.Header()) - w.WriteHeader(int(resp.Code)) - if _, err := w.Write(resp.Body); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } +// Errorf returns a HTTP gRPC error than is correctly forwarded over +// gRPC, and can eventually be converted back to a HTTP response with +// HTTPResponseFromError. +func Errorf(code int, tmpl string, args ...interface{}) error { + return ErrorFromHTTPResponse(&HTTPResponse{ + Code: int32(code), + Body: []byte(fmt.Sprintf(tmpl, args...)), + }) } -func errorFromHTTPResponse(resp *HTTPResponse) error { +// ErrorFromHTTPResponse converts an HTTP response into a grpc error +func ErrorFromHTTPResponse(resp *HTTPResponse) error { a, err := ptypes.MarshalAny(resp) if err != nil { return err @@ -182,10 +34,10 @@ func errorFromHTTPResponse(resp *HTTPResponse) error { }) } -func httpResponseFromError(err error) (*HTTPResponse, bool) { +// HTTPResponseFromError converts a grpc error into an HTTP response +func HTTPResponseFromError(err error) (*HTTPResponse, bool) { s, ok := status.FromError(err) if !ok { - fmt.Println("not status") return nil, false } @@ -202,20 +54,3 @@ func httpResponseFromError(err error) (*HTTPResponse, bool) { return &resp, true } - -func toHeader(hs []*Header, header http.Header) { - for _, h := range hs { - header[h.Key] = h.Values - } -} - -func fromHeader(hs http.Header) []*Header { - result := make([]*Header, 0, len(hs)) - for k, vs := range hs { - result = append(result, &Header{ - Key: k, - Values: vs, - }) - } - return result -} diff --git a/httpgrpc/httpgrpc.pb.go b/httpgrpc/httpgrpc.pb.go index 02b2a478..8b498ced 100644 --- a/httpgrpc/httpgrpc.pb.go +++ b/httpgrpc/httpgrpc.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-go. DO NOT EDIT. // source: httpgrpc.proto -// DO NOT EDIT! /* Package httpgrpc is a generated protocol buffer package. @@ -213,7 +212,7 @@ func init() { proto.RegisterFile("httpgrpc.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ // 231 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4f, 0xc3, 0x30, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4f, 0xc3, 0x30, 0x10, 0x85, 0x49, 0x1d, 0x0c, 0xbd, 0x56, 0xa8, 0x3a, 0x89, 0xca, 0x62, 0x8a, 0x32, 0x45, 0x0c, 0x1d, 0xc2, 0xc4, 0x88, 0x58, 0x32, 0x22, 0xab, 0x7f, 0x20, 0xc1, 0x27, 0x22, 0x11, 0x6a, 0x63, 0x3b, 0xa0, 0xfe, 0x7b, 0x64, 0x3b, 0x85, 0x88, 0xa9, 0xdb, 0x7b, 0xe7, 0x27, 0x7f, 0xf7, 0x0e, diff --git a/httpgrpc/server/server.go b/httpgrpc/server/server.go new file mode 100644 index 00000000..0ccad0a6 --- /dev/null +++ b/httpgrpc/server/server.go @@ -0,0 +1,183 @@ +package server + +import ( + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/mwitkow/go-grpc-middleware" + "github.com/opentracing/opentracing-go" + "github.com/sercand/kuberesolver" + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" +) + +// Server implements HTTPServer. HTTPServer is a generated interface that gRPC +// servers must implement. +type Server struct { + handler http.Handler +} + +// NewServer makes a new Server. +func NewServer(handler http.Handler) *Server { + return &Server{ + handler: handler, + } +} + +// Handle implements HTTPServer. +func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + req, err := http.NewRequest(r.Method, r.Url, ioutil.NopCloser(bytes.NewReader(r.Body))) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + toHeader(r.Headers, req.Header) + req.RequestURI = r.Url + recorder := httptest.NewRecorder() + s.handler.ServeHTTP(recorder, req) + resp := &httpgrpc.HTTPResponse{ + Code: int32(recorder.Code), + Headers: fromHeader(recorder.Header()), + Body: recorder.Body.Bytes(), + } + if recorder.Code/100 == 5 { + return nil, httpgrpc.ErrorFromHTTPResponse(resp) + } + return resp, err +} + +// Client is a http.Handler that forwards the request over gRPC. +type Client struct { + mtx sync.RWMutex + service string + namespace string + port string + client httpgrpc.HTTPClient + conn *grpc.ClientConn +} + +// ParseURL deals with direct:// style URLs, as well as kubernetes:// urls. +// For backwards compatibility it treats URLs without schems as kubernetes://. +func ParseURL(unparsed string) (string, []grpc.DialOption, error) { + parsed, err := url.Parse(unparsed) + if err != nil { + return "", nil, err + } + + scheme, host := parsed.Scheme, parsed.Host + if !strings.Contains(unparsed, "://") { + scheme, host = "kubernetes", unparsed + } + + switch scheme { + case "direct": + return host, nil, err + + case "kubernetes": + host, port, err := net.SplitHostPort(host) + if err != nil { + return "", nil, err + } + parts := strings.SplitN(host, ".", 2) + service, namespace := parts[0], "default" + if len(parts) == 2 { + namespace = parts[1] + } + balancer := kuberesolver.NewWithNamespace(namespace) + address := fmt.Sprintf("kubernetes://%s:%s", service, port) + dialOptions := []grpc.DialOption{balancer.DialOption()} + return address, dialOptions, nil + + default: + return "", nil, fmt.Errorf("unrecognised scheme: %s", parsed.Scheme) + } +} + +// NewClient makes a new Client, given a kubernetes service address. +func NewClient(address string) (*Client, error) { + address, dialOptions, err := ParseURL(address) + if err != nil { + return nil, err + } + + dialOptions = append( + dialOptions, + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + + conn, err := grpc.Dial(address, dialOptions...) + if err != nil { + return nil, err + } + + return &Client{ + client: httpgrpc.NewHTTPClient(conn), + conn: conn, + }, nil +} + +// ServeHTTP implements http.Handler +func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + req := &httpgrpc.HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: fromHeader(r.Header), + } + + resp, err := c.client.Handle(r.Context(), req) + if err != nil { + // Some errors will actually contain a valid resp, just need to unpack it + var ok bool + resp, ok = httpgrpc.HTTPResponseFromError(err) + + if !ok { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + toHeader(resp.Headers, w.Header()) + w.WriteHeader(int(resp.Code)) + if _, err := w.Write(resp.Body); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func toHeader(hs []*httpgrpc.Header, header http.Header) { + for _, h := range hs { + header[h.Key] = h.Values + } +} + +func fromHeader(hs http.Header) []*httpgrpc.Header { + result := make([]*httpgrpc.Header, 0, len(hs)) + for k, vs := range hs { + result = append(result, &httpgrpc.Header{ + Key: k, + Values: vs, + }) + } + return result +} diff --git a/httpgrpc/httpgrpc_test.go b/httpgrpc/server/server_test.go similarity index 95% rename from httpgrpc/httpgrpc_test.go rename to httpgrpc/server/server_test.go index 789b59fe..4cf1c867 100644 --- a/httpgrpc/httpgrpc_test.go +++ b/httpgrpc/server/server_test.go @@ -1,4 +1,4 @@ -package httpgrpc +package server import ( "bytes" @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "google.golang.org/grpc" ) @@ -34,7 +35,7 @@ func newTestServer(handler http.Handler) (*testServer, error) { URL: "direct://" + lis.Addr().String(), } - RegisterHTTPServer(server.grpcServer, server.Server) + httpgrpc.RegisterHTTPServer(server.grpcServer, server.Server) go server.grpcServer.Serve(lis) return server, nil diff --git a/middleware/grpc_instrumentation.go b/middleware/grpc_instrumentation.go index 3d002be8..06144334 100644 --- a/middleware/grpc_instrumentation.go +++ b/middleware/grpc_instrumentation.go @@ -1,23 +1,30 @@ package middleware import ( + "strconv" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/httpgrpc" "golang.org/x/net/context" "google.golang.org/grpc" ) // ServerInstrumentInterceptor instruments gRPC requests for errors and latency. -func ServerInstrumentInterceptor(duration *prometheus.HistogramVec) grpc.UnaryServerInterceptor { +func ServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { begin := time.Now() resp, err := handler(ctx, req) - status := "success" + duration := time.Since(begin).Seconds() + respStatus := "success" if err != nil { - status = "error" + if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + respStatus = strconv.Itoa(int(errResp.Code)) + } else { + respStatus = "error" + } } - duration.WithLabelValues(gRPC, info.FullMethod, status, "false").Observe(time.Since(begin).Seconds()) + hist.WithLabelValues(gRPC, info.FullMethod, respStatus, "false").Observe(duration) return resp, err } } diff --git a/server/fake_server.pb.go b/server/fake_server.pb.go new file mode 100644 index 00000000..ab3563a0 --- /dev/null +++ b/server/fake_server.pb.go @@ -0,0 +1,210 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: fake_server.proto + +/* +Package server is a generated protocol buffer package. + +It is generated from these files: + fake_server.proto + +It has these top-level messages: + FailWithHTTPErrorRequest +*/ +package server + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/ptypes/empty" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type FailWithHTTPErrorRequest struct { + Code int32 `protobuf:"varint,1,opt,name=Code" json:"Code,omitempty"` +} + +func (m *FailWithHTTPErrorRequest) Reset() { *m = FailWithHTTPErrorRequest{} } +func (m *FailWithHTTPErrorRequest) String() string { return proto.CompactTextString(m) } +func (*FailWithHTTPErrorRequest) ProtoMessage() {} +func (*FailWithHTTPErrorRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *FailWithHTTPErrorRequest) GetCode() int32 { + if m != nil { + return m.Code + } + return 0 +} + +func init() { + proto.RegisterType((*FailWithHTTPErrorRequest)(nil), "server.FailWithHTTPErrorRequest") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for FakeServer service + +type FakeServerClient interface { + Succeed(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) + FailWithError(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) + FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) +} + +type fakeServerClient struct { + cc *grpc.ClientConn +} + +func NewFakeServerClient(cc *grpc.ClientConn) FakeServerClient { + return &fakeServerClient{cc} +} + +func (c *fakeServerClient) Succeed(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + out := new(google_protobuf.Empty) + err := grpc.Invoke(ctx, "/server.FakeServer/Succeed", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *fakeServerClient) FailWithError(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + out := new(google_protobuf.Empty) + err := grpc.Invoke(ctx, "/server.FakeServer/FailWithError", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + out := new(google_protobuf.Empty) + err := grpc.Invoke(ctx, "/server.FakeServer/FailWithHTTPError", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for FakeServer service + +type FakeServerServer interface { + Succeed(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error) + FailWithError(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error) + FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*google_protobuf.Empty, error) +} + +func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { + s.RegisterService(&_FakeServer_serviceDesc, srv) +} + +func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).Succeed(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/Succeed", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).Succeed(ctx, req.(*google_protobuf.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).FailWithError(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/FailWithError", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).FailWithError(ctx, req.(*google_protobuf.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _FakeServer_FailWithHTTPError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FailWithHTTPErrorRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).FailWithHTTPError(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/FailWithHTTPError", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).FailWithHTTPError(ctx, req.(*FailWithHTTPErrorRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _FakeServer_serviceDesc = grpc.ServiceDesc{ + ServiceName: "server.FakeServer", + HandlerType: (*FakeServerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Succeed", + Handler: _FakeServer_Succeed_Handler, + }, + { + MethodName: "FailWithError", + Handler: _FakeServer_FailWithError_Handler, + }, + { + MethodName: "FailWithHTTPError", + Handler: _FakeServer_FailWithHTTPError_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "fake_server.proto", +} + +func init() { proto.RegisterFile("fake_server.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 185 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x4b, 0xcc, 0x4e, + 0x8d, 0x2f, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x83, + 0xf0, 0xa4, 0xa4, 0xd3, 0xf3, 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0xc1, 0xa2, 0x49, 0xa5, 0x69, 0xfa, + 0xa9, 0xb9, 0x05, 0x25, 0x95, 0x10, 0x45, 0x4a, 0x7a, 0x5c, 0x12, 0x6e, 0x89, 0x99, 0x39, 0xe1, + 0x99, 0x25, 0x19, 0x1e, 0x21, 0x21, 0x01, 0xae, 0x45, 0x45, 0xf9, 0x45, 0x41, 0xa9, 0x85, 0xa5, + 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0xce, 0xf9, 0x29, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, + 0xac, 0x41, 0x60, 0xb6, 0xd1, 0x5d, 0x46, 0x2e, 0x2e, 0xb7, 0xc4, 0xec, 0xd4, 0x60, 0xb0, 0xd9, + 0x42, 0xd6, 0x5c, 0xec, 0xc1, 0xa5, 0xc9, 0xc9, 0xa9, 0xa9, 0x29, 0x42, 0x62, 0x7a, 0x10, 0x7b, + 0xf4, 0x60, 0xf6, 0xe8, 0xb9, 0x82, 0xec, 0x91, 0xc2, 0x21, 0xae, 0xc4, 0x20, 0xe4, 0xc8, 0xc5, + 0x0b, 0xb3, 0x1b, 0x6c, 0x2f, 0x19, 0x46, 0xf8, 0x73, 0x09, 0x62, 0x38, 0x5f, 0x48, 0x41, 0x0f, + 0x1a, 0x0e, 0xb8, 0x7c, 0x86, 0xdb, 0xc0, 0x24, 0x36, 0xb0, 0x88, 0x31, 0x20, 0x00, 0x00, 0xff, + 0xff, 0xdc, 0x61, 0xa8, 0xf0, 0x50, 0x01, 0x00, 0x00, +} diff --git a/server/fake_server.proto b/server/fake_server.proto new file mode 100644 index 00000000..d8cfb13e --- /dev/null +++ b/server/fake_server.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package server; + +import "google/protobuf/empty.proto"; + +service FakeServer { + rpc Succeed(google.protobuf.Empty) returns (google.protobuf.Empty) {}; + rpc FailWithError(google.protobuf.Empty) returns (google.protobuf.Empty) {}; + rpc FailWithHTTPError(FailWithHTTPErrorRequest) returns (google.protobuf.Empty) {}; +} + +message FailWithHTTPErrorRequest { + int32 Code = 1; +} diff --git a/server/server.go b/server/server.go index 5deb7b95..6de17b45 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,7 @@ import ( "github.com/weaveworks-experiments/loki/pkg/client" "github.com/weaveworks/common/httpgrpc" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/signals" ) @@ -149,7 +150,7 @@ func (s *Server) Run() { // Setup gRPC server // for HTTP over gRPC, ensure we don't double-count the middleware - httpgrpc.RegisterHTTPServer(s.GRPC, httpgrpc.NewServer(s.HTTP)) + httpgrpc.RegisterHTTPServer(s.GRPC, httpgrpc_server.NewServer(s.HTTP)) go s.GRPC.Serve(s.grpcListener) defer s.GRPC.GracefulStop() diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 00000000..c5428f33 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,96 @@ +package server + +import ( + "errors" + "net/http" + "strconv" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/status" + + google_protobuf "github.com/golang/protobuf/ptypes/empty" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "golang.org/x/net/context" +) + +type FakeServer struct{} + +func (f FakeServer) FailWithError(ctx context.Context, req *google_protobuf.Empty) (*google_protobuf.Empty, error) { + return nil, errors.New("test error") +} + +func (f FakeServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*google_protobuf.Empty, error) { + return nil, httpgrpc.Errorf(int(req.Code), strconv.Itoa(int(req.Code))) +} + +func (f FakeServer) Succeed(ctx context.Context, req *google_protobuf.Empty) (*google_protobuf.Empty, error) { + return &google_protobuf.Empty{}, nil +} + +func TestErrorInstrumentationMiddleware(t *testing.T) { + cfg := Config{GRPCListenPort: 1234} + server, err := New(cfg) + require.NoError(t, err) + + fakeServer := FakeServer{} + RegisterFakeServerServer(server.GRPC, fakeServer) + + go server.Run() + defer server.Shutdown() + + conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) + defer conn.Close() + require.NoError(t, err) + + empty := google_protobuf.Empty{} + client := NewFakeServerClient(conn) + res, err := client.Succeed(context.Background(), &empty) + require.NoError(t, err) + require.EqualValues(t, &empty, res) + + res, err = client.FailWithError(context.Background(), &empty) + require.Nil(t, res) + require.Error(t, err) + + s, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, "test error", s.Message()) + + res, err = client.FailWithHTTPError(context.Background(), &FailWithHTTPErrorRequest{Code: http.StatusPaymentRequired}) + require.Nil(t, res) + errResp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok) + require.Equal(t, int32(http.StatusPaymentRequired), errResp.Code) + require.Equal(t, "402", string(errResp.Body)) + + conn.Close() + + metrics, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + + statuses := map[string]string{} + for _, family := range metrics { + if *family.Name == "request_duration_seconds" { + for _, metric := range family.Metric { + var route, statusCode string + for _, label := range metric.GetLabel() { + switch label.GetName() { + case "status_code": + statusCode = label.GetValue() + case "route": + route = label.GetValue() + } + } + statuses[route] = statusCode + } + } + } + require.Equal(t, map[string]string{ + "/server.FakeServer/FailWithError": "error", + "/server.FakeServer/FailWithHTTPError": "402", + "/server.FakeServer/Succeed": "success", + }, statuses) +}