From d5f38b483286d22f8f247fdc7021de3664da072a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 15:27:21 +0000 Subject: [PATCH 1/7] Refactor: extract common error decoding --- middleware/grpc_instrumentation.go | 34 ++++++++++++------------------ 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/middleware/grpc_instrumentation.go b/middleware/grpc_instrumentation.go index c5c7d846..db295eec 100644 --- a/middleware/grpc_instrumentation.go +++ b/middleware/grpc_instrumentation.go @@ -10,21 +10,24 @@ import ( "google.golang.org/grpc" ) +func observe(hist *prometheus.HistogramVec, method string, err error, duration time.Duration) { + respStatus := "success" + if err != nil { + if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + respStatus = strconv.Itoa(int(errResp.Code)) + } else { + respStatus = "error" + } + } + hist.WithLabelValues(gRPC, method, respStatus, "false").Observe(duration.Seconds()) +} + // UnaryServerInstrumentInterceptor instruments gRPC requests for errors and latency. func UnaryServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { begin := time.Now() resp, err := handler(ctx, req) - duration := time.Since(begin).Seconds() - respStatus := "success" - if err != nil { - if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - respStatus = strconv.Itoa(int(errResp.Code)) - } else { - respStatus = "error" - } - } - hist.WithLabelValues(gRPC, info.FullMethod, respStatus, "false").Observe(duration) + observe(hist, info.FullMethod, err, time.Since(begin)) return resp, err } } @@ -34,16 +37,7 @@ func StreamServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.Strea return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { begin := time.Now() err := handler(srv, ss) - duration := time.Since(begin).Seconds() - respStatus := "success" - if err != nil { - if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - respStatus = strconv.Itoa(int(errResp.Code)) - } else { - respStatus = "error" - } - } - hist.WithLabelValues(gRPC, info.FullMethod, respStatus, "false").Observe(duration) + observe(hist, info.FullMethod, err, time.Since(begin)) return err } } From 9174f0f5e87c61fcb7044de3ed7f5029276b4d75 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 17:14:48 +0000 Subject: [PATCH 2/7] Add make rules for protos --- Makefile | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 317392ae..20b135d6 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,8 @@ DOCKER_IMAGE_DIRS=$(patsubst %/Dockerfile,%,$(DOCKERFILES)) all: $(UPTODATE_FILES) +GENERATED_PROTOS=server/fake_server.pb.go httpgrpc/httpgrpc.pb.go + # All the boiler plate for building golang follows: SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E") BUILD_IN_CONTAINER := true @@ -40,7 +42,7 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \ ifeq ($(BUILD_IN_CONTAINER),true) -lint test shell: +lint test shell protos: @mkdir -p $(shell pwd)/.pkg $(SUDO) docker run $(RM) -ti \ -v $(shell pwd)/.pkg:/go/pkg \ @@ -50,6 +52,11 @@ lint test shell: else +protos: $(GENERATED_PROTOS) + +%.pb.go: %.proto + protoc --go_out=plugins=grpc:../../.. $< + lint: ./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers . From 98dea15b8b757f30c2c09fa62a2e69dc79efe47a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 17:22:18 +0000 Subject: [PATCH 3/7] Re-do protogen for fake_server with current gRPC --- server/fake_server.pb.go | 161 +++++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 66 deletions(-) diff --git a/server/fake_server.pb.go b/server/fake_server.pb.go index 66bb7f31..b4c231da 100644 --- a/server/fake_server.pb.go +++ b/server/fake_server.pb.go @@ -1,25 +1,17 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: fake_server.proto +// source: server/fake_server.proto -/* -Package server is a generated protocol buffer package. - -It is generated from these files: - fake_server.proto - -It has these top-level messages: - FailWithHTTPErrorRequest -*/ package server -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" -import google_protobuf "github.com/golang/protobuf/ptypes/empty" - import ( - context "golang.org/x/net/context" + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + empty "github.com/golang/protobuf/ptypes/empty" grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" ) // Reference imports to suppress errors if they are not otherwise used. @@ -31,16 +23,39 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type FailWithHTTPErrorRequest struct { - Code int32 `protobuf:"varint,1,opt,name=Code" json:"Code,omitempty"` + Code int32 `protobuf:"varint,1,opt,name=Code,proto3" json:"Code,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FailWithHTTPErrorRequest) Reset() { *m = FailWithHTTPErrorRequest{} } +func (m *FailWithHTTPErrorRequest) String() string { return proto.CompactTextString(m) } +func (*FailWithHTTPErrorRequest) ProtoMessage() {} +func (*FailWithHTTPErrorRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_2267b95bf2c64cdc, []int{0} +} + +func (m *FailWithHTTPErrorRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FailWithHTTPErrorRequest.Unmarshal(m, b) +} +func (m *FailWithHTTPErrorRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FailWithHTTPErrorRequest.Marshal(b, m, deterministic) +} +func (m *FailWithHTTPErrorRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FailWithHTTPErrorRequest.Merge(m, src) +} +func (m *FailWithHTTPErrorRequest) XXX_Size() int { + return xxx_messageInfo_FailWithHTTPErrorRequest.Size(m) +} +func (m *FailWithHTTPErrorRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FailWithHTTPErrorRequest.DiscardUnknown(m) } -func (m *FailWithHTTPErrorRequest) Reset() { *m = FailWithHTTPErrorRequest{} } -func (m *FailWithHTTPErrorRequest) String() string { return proto.CompactTextString(m) } -func (*FailWithHTTPErrorRequest) ProtoMessage() {} -func (*FailWithHTTPErrorRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +var xxx_messageInfo_FailWithHTTPErrorRequest proto.InternalMessageInfo func (m *FailWithHTTPErrorRequest) GetCode() int32 { if m != nil { @@ -53,6 +68,26 @@ func init() { proto.RegisterType((*FailWithHTTPErrorRequest)(nil), "server.FailWithHTTPErrorRequest") } +func init() { proto.RegisterFile("server/fake_server.proto", fileDescriptor_2267b95bf2c64cdc) } + +var fileDescriptor_2267b95bf2c64cdc = []byte{ + // 223 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4e, 0x2d, 0x2a, + 0x4b, 0x2d, 0xd2, 0x4f, 0x4b, 0xcc, 0x4e, 0x8d, 0x87, 0xb0, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, + 0x85, 0xd8, 0x20, 0x3c, 0x29, 0xe9, 0xf4, 0xfc, 0xfc, 0xf4, 0x9c, 0x54, 0x7d, 0xb0, 0x68, 0x52, + 0x69, 0x9a, 0x7e, 0x6a, 0x6e, 0x41, 0x49, 0x25, 0x44, 0x91, 0x92, 0x1e, 0x97, 0x84, 0x5b, 0x62, + 0x66, 0x4e, 0x78, 0x66, 0x49, 0x86, 0x47, 0x48, 0x48, 0x80, 0x6b, 0x51, 0x51, 0x7e, 0x51, 0x50, + 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x10, 0x17, 0x8b, 0x73, 0x7e, 0x4a, 0xaa, 0x04, 0xa3, + 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x98, 0x6d, 0x74, 0x97, 0x91, 0x8b, 0xcb, 0x2d, 0x31, 0x3b, 0x35, + 0x18, 0x6c, 0xb6, 0x90, 0x35, 0x17, 0x7b, 0x70, 0x69, 0x72, 0x72, 0x6a, 0x6a, 0x8a, 0x90, 0x98, + 0x1e, 0xc4, 0x1e, 0x3d, 0x98, 0x3d, 0x7a, 0xae, 0x20, 0x7b, 0xa4, 0x70, 0x88, 0x2b, 0x31, 0x08, + 0x39, 0x72, 0xf1, 0xc2, 0xec, 0x06, 0xdb, 0x4b, 0x86, 0x11, 0xfe, 0x5c, 0x82, 0x18, 0xce, 0x17, + 0x52, 0xd0, 0x83, 0x86, 0x03, 0x2e, 0x9f, 0xe1, 0x36, 0xd0, 0x49, 0x35, 0x4a, 0x39, 0x3d, 0xb3, + 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0xbf, 0x3c, 0x35, 0xb1, 0x2c, 0xb5, 0x3c, 0xbf, + 0x28, 0xbb, 0x58, 0x3f, 0x39, 0x3f, 0x37, 0x37, 0x3f, 0x4f, 0x1f, 0x62, 0x72, 0x12, 0x1b, 0x58, + 0xa3, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x1c, 0xf3, 0xf1, 0x28, 0x7e, 0x01, 0x00, 0x00, +} + // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn @@ -61,12 +96,13 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// Client API for FakeServer service - +// FakeServerClient is the client API for FakeServer service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type FakeServerClient interface { - Succeed(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) - FailWithError(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) - FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) + Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) } type fakeServerClient struct { @@ -77,39 +113,52 @@ func NewFakeServerClient(cc *grpc.ClientConn) FakeServerClient { return &fakeServerClient{cc} } -func (c *fakeServerClient) Succeed(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - out := new(google_protobuf.Empty) - err := grpc.Invoke(ctx, "/server.FakeServer/Succeed", in, out, c.cc, opts...) +func (c *fakeServerClient) Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/server.FakeServer/Succeed", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *fakeServerClient) FailWithError(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - out := new(google_protobuf.Empty) - err := grpc.Invoke(ctx, "/server.FakeServer/FailWithError", in, out, c.cc, opts...) +func (c *fakeServerClient) FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/server.FakeServer/FailWithError", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - out := new(google_protobuf.Empty) - err := grpc.Invoke(ctx, "/server.FakeServer/FailWithHTTPError", in, out, c.cc, opts...) +func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/server.FakeServer/FailWithHTTPError", in, out, opts...) if err != nil { return nil, err } return out, nil } -// Server API for FakeServer service - +// FakeServerServer is the server API for FakeServer service. type FakeServerServer interface { - Succeed(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error) - FailWithError(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error) - FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*google_protobuf.Empty, error) + Succeed(context.Context, *empty.Empty) (*empty.Empty, error) + FailWithError(context.Context, *empty.Empty) (*empty.Empty, error) + FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*empty.Empty, error) +} + +// UnimplementedFakeServerServer can be embedded to have forward compatible implementations. +type UnimplementedFakeServerServer struct { +} + +func (*UnimplementedFakeServerServer) Succeed(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Succeed not implemented") +} +func (*UnimplementedFakeServerServer) FailWithError(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method FailWithError not implemented") +} +func (*UnimplementedFakeServerServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method FailWithHTTPError not implemented") } func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { @@ -117,7 +166,7 @@ func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { } func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(google_protobuf.Empty) + in := new(empty.Empty) if err := dec(in); err != nil { return nil, err } @@ -129,13 +178,13 @@ func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func( FullMethod: "/server.FakeServer/Succeed", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).Succeed(ctx, req.(*google_protobuf.Empty)) + return srv.(FakeServerServer).Succeed(ctx, req.(*empty.Empty)) } return interceptor(ctx, in, info, handler) } func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(google_protobuf.Empty) + in := new(empty.Empty) if err := dec(in); err != nil { return nil, err } @@ -147,7 +196,7 @@ func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec FullMethod: "/server.FakeServer/FailWithError", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).FailWithError(ctx, req.(*google_protobuf.Empty)) + return srv.(FakeServerServer).FailWithError(ctx, req.(*empty.Empty)) } return interceptor(ctx, in, info, handler) } @@ -188,25 +237,5 @@ var _FakeServer_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: "fake_server.proto", -} - -func init() { proto.RegisterFile("fake_server.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 221 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x4b, 0xcc, 0x4e, - 0x8d, 0x2f, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x83, - 0xf0, 0xa4, 0xa4, 0xd3, 0xf3, 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0xc1, 0xa2, 0x49, 0xa5, 0x69, 0xfa, - 0xa9, 0xb9, 0x05, 0x25, 0x95, 0x10, 0x45, 0x4a, 0x7a, 0x5c, 0x12, 0x6e, 0x89, 0x99, 0x39, 0xe1, - 0x99, 0x25, 0x19, 0x1e, 0x21, 0x21, 0x01, 0xae, 0x45, 0x45, 0xf9, 0x45, 0x41, 0xa9, 0x85, 0xa5, - 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0xce, 0xf9, 0x29, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, - 0xac, 0x41, 0x60, 0xb6, 0xd1, 0x5d, 0x46, 0x2e, 0x2e, 0xb7, 0xc4, 0xec, 0xd4, 0x60, 0xb0, 0xd9, - 0x42, 0xd6, 0x5c, 0xec, 0xc1, 0xa5, 0xc9, 0xc9, 0xa9, 0xa9, 0x29, 0x42, 0x62, 0x7a, 0x10, 0x7b, - 0xf4, 0x60, 0xf6, 0xe8, 0xb9, 0x82, 0xec, 0x91, 0xc2, 0x21, 0xae, 0xc4, 0x20, 0xe4, 0xc8, 0xc5, - 0x0b, 0xb3, 0x1b, 0x6c, 0x2f, 0x19, 0x46, 0xf8, 0x73, 0x09, 0x62, 0x38, 0x5f, 0x48, 0x41, 0x0f, - 0x1a, 0x0e, 0xb8, 0x7c, 0x86, 0xdb, 0x40, 0x27, 0xd5, 0x28, 0xe5, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, - 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0xfd, 0xf2, 0xd4, 0xc4, 0xb2, 0xd4, 0xf2, 0xfc, 0xa2, 0xec, 0x62, - 0xfd, 0xe4, 0xfc, 0xdc, 0xdc, 0xfc, 0x3c, 0x7d, 0x88, 0xc9, 0x49, 0x6c, 0x60, 0x8d, 0xc6, 0x80, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x49, 0x74, 0xb3, 0xd0, 0x77, 0x01, 0x00, 0x00, + Metadata: "server/fake_server.proto", } From cb923f9c9fdca33107b1923bb5339cea4a19c9da Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 17:23:40 +0000 Subject: [PATCH 4/7] Extend server tests with canceled gRPC functions --- server/fake_server.pb.go | 116 ++++++++++++++++++++++++++++++++++++--- server/fake_server.proto | 2 + server/server_test.go | 51 ++++++++++++++++- 3 files changed, 161 insertions(+), 8 deletions(-) diff --git a/server/fake_server.pb.go b/server/fake_server.pb.go index b4c231da..4fafbb2f 100644 --- a/server/fake_server.pb.go +++ b/server/fake_server.pb.go @@ -71,21 +71,23 @@ func init() { func init() { proto.RegisterFile("server/fake_server.proto", fileDescriptor_2267b95bf2c64cdc) } var fileDescriptor_2267b95bf2c64cdc = []byte{ - // 223 bytes of a gzipped FileDescriptorProto + // 246 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x4f, 0x4b, 0xcc, 0x4e, 0x8d, 0x87, 0xb0, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0xd8, 0x20, 0x3c, 0x29, 0xe9, 0xf4, 0xfc, 0xfc, 0xf4, 0x9c, 0x54, 0x7d, 0xb0, 0x68, 0x52, 0x69, 0x9a, 0x7e, 0x6a, 0x6e, 0x41, 0x49, 0x25, 0x44, 0x91, 0x92, 0x1e, 0x97, 0x84, 0x5b, 0x62, 0x66, 0x4e, 0x78, 0x66, 0x49, 0x86, 0x47, 0x48, 0x48, 0x80, 0x6b, 0x51, 0x51, 0x7e, 0x51, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x10, 0x17, 0x8b, 0x73, 0x7e, 0x4a, 0xaa, 0x04, 0xa3, - 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x98, 0x6d, 0x74, 0x97, 0x91, 0x8b, 0xcb, 0x2d, 0x31, 0x3b, 0x35, + 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x98, 0x6d, 0x74, 0x9b, 0x89, 0x8b, 0xcb, 0x2d, 0x31, 0x3b, 0x35, 0x18, 0x6c, 0xb6, 0x90, 0x35, 0x17, 0x7b, 0x70, 0x69, 0x72, 0x72, 0x6a, 0x6a, 0x8a, 0x90, 0x98, 0x1e, 0xc4, 0x1e, 0x3d, 0x98, 0x3d, 0x7a, 0xae, 0x20, 0x7b, 0xa4, 0x70, 0x88, 0x2b, 0x31, 0x08, 0x39, 0x72, 0xf1, 0xc2, 0xec, 0x06, 0xdb, 0x4b, 0x86, 0x11, 0xfe, 0x5c, 0x82, 0x18, 0xce, 0x17, - 0x52, 0xd0, 0x83, 0x86, 0x03, 0x2e, 0x9f, 0xe1, 0x36, 0xd0, 0x49, 0x35, 0x4a, 0x39, 0x3d, 0xb3, - 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0xbf, 0x3c, 0x35, 0xb1, 0x2c, 0xb5, 0x3c, 0xbf, - 0x28, 0xbb, 0x58, 0x3f, 0x39, 0x3f, 0x37, 0x37, 0x3f, 0x4f, 0x1f, 0x62, 0x72, 0x12, 0x1b, 0x58, - 0xa3, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x1c, 0xf3, 0xf1, 0x28, 0x7e, 0x01, 0x00, 0x00, + 0x52, 0xd0, 0x83, 0x86, 0x03, 0x2e, 0x9f, 0xe1, 0x31, 0xd0, 0x92, 0x8b, 0x35, 0x38, 0x27, 0x35, + 0xb5, 0x80, 0x2c, 0xef, 0x70, 0x07, 0x97, 0x14, 0xa5, 0x26, 0xe6, 0x92, 0x69, 0x80, 0x01, 0xa3, + 0x93, 0x6a, 0x94, 0x72, 0x7a, 0x66, 0x49, 0x46, 0x69, 0x92, 0x5e, 0x72, 0x7e, 0xae, 0x7e, 0x79, + 0x6a, 0x62, 0x59, 0x6a, 0x79, 0x7e, 0x51, 0x76, 0xb1, 0x7e, 0x72, 0x7e, 0x6e, 0x6e, 0x7e, 0x9e, + 0x3e, 0xc4, 0x5f, 0x49, 0x6c, 0x60, 0xad, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xcd, + 0xa4, 0xf9, 0xfc, 0x01, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -103,6 +105,8 @@ type FakeServerClient interface { Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) + Sleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + StreamSleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) } type fakeServerClient struct { @@ -140,11 +144,54 @@ func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHT return out, nil } +func (c *fakeServerClient) Sleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/server.FakeServer/Sleep", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *fakeServerClient) StreamSleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) { + stream, err := c.cc.NewStream(ctx, &_FakeServer_serviceDesc.Streams[0], "/server.FakeServer/StreamSleep", opts...) + if err != nil { + return nil, err + } + x := &fakeServerStreamSleepClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type FakeServer_StreamSleepClient interface { + Recv() (*empty.Empty, error) + grpc.ClientStream +} + +type fakeServerStreamSleepClient struct { + grpc.ClientStream +} + +func (x *fakeServerStreamSleepClient) Recv() (*empty.Empty, error) { + m := new(empty.Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // FakeServerServer is the server API for FakeServer service. type FakeServerServer interface { Succeed(context.Context, *empty.Empty) (*empty.Empty, error) FailWithError(context.Context, *empty.Empty) (*empty.Empty, error) FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*empty.Empty, error) + Sleep(context.Context, *empty.Empty) (*empty.Empty, error) + StreamSleep(*empty.Empty, FakeServer_StreamSleepServer) error } // UnimplementedFakeServerServer can be embedded to have forward compatible implementations. @@ -160,6 +207,12 @@ func (*UnimplementedFakeServerServer) FailWithError(ctx context.Context, req *em func (*UnimplementedFakeServerServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method FailWithHTTPError not implemented") } +func (*UnimplementedFakeServerServer) Sleep(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Sleep not implemented") +} +func (*UnimplementedFakeServerServer) StreamSleep(req *empty.Empty, srv FakeServer_StreamSleepServer) error { + return status.Errorf(codes.Unimplemented, "method StreamSleep not implemented") +} func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { s.RegisterService(&_FakeServer_serviceDesc, srv) @@ -219,6 +272,45 @@ func _FakeServer_FailWithHTTPError_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _FakeServer_Sleep_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(empty.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FakeServerServer).Sleep(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.FakeServer/Sleep", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FakeServerServer).Sleep(ctx, req.(*empty.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _FakeServer_StreamSleep_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(empty.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(FakeServerServer).StreamSleep(m, &fakeServerStreamSleepServer{stream}) +} + +type FakeServer_StreamSleepServer interface { + Send(*empty.Empty) error + grpc.ServerStream +} + +type fakeServerStreamSleepServer struct { + grpc.ServerStream +} + +func (x *fakeServerStreamSleepServer) Send(m *empty.Empty) error { + return x.ServerStream.SendMsg(m) +} + var _FakeServer_serviceDesc = grpc.ServiceDesc{ ServiceName: "server.FakeServer", HandlerType: (*FakeServerServer)(nil), @@ -235,7 +327,17 @@ var _FakeServer_serviceDesc = grpc.ServiceDesc{ MethodName: "FailWithHTTPError", Handler: _FakeServer_FailWithHTTPError_Handler, }, + { + MethodName: "Sleep", + Handler: _FakeServer_Sleep_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamSleep", + Handler: _FakeServer_StreamSleep_Handler, + ServerStreams: true, + }, }, - Streams: []grpc.StreamDesc{}, Metadata: "server/fake_server.proto", } diff --git a/server/fake_server.proto b/server/fake_server.proto index 2520d944..bbf39381 100644 --- a/server/fake_server.proto +++ b/server/fake_server.proto @@ -10,6 +10,8 @@ service FakeServer { rpc Succeed(google.protobuf.Empty) returns (google.protobuf.Empty) {}; rpc FailWithError(google.protobuf.Empty) returns (google.protobuf.Empty) {}; rpc FailWithHTTPError(FailWithHTTPErrorRequest) returns (google.protobuf.Empty) {}; + rpc Sleep(google.protobuf.Empty) returns (google.protobuf.Empty) {}; + rpc StreamSleep(google.protobuf.Empty) returns (stream google.protobuf.Empty) {}; } message FailWithHTTPErrorRequest { diff --git a/server/server_test.go b/server/server_test.go index 06a85856..1a5cfaed 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6,6 +6,7 @@ import ( "net/http" "strconv" "testing" + "time" "google.golang.org/grpc" "google.golang.org/grpc/status" @@ -33,6 +34,29 @@ func (f FakeServer) Succeed(ctx context.Context, req *google_protobuf.Empty) (*g return &google_protobuf.Empty{}, nil } +func (f FakeServer) Sleep(ctx context.Context, req *google_protobuf.Empty) (*google_protobuf.Empty, error) { + err := cancelableSleep(ctx, 10*time.Second) + return &google_protobuf.Empty{}, err +} + +func (f FakeServer) StreamSleep(req *google_protobuf.Empty, stream FakeServer_StreamSleepServer) error { + for x := 0; x < 100; x++ { + time.Sleep(time.Second / 100.0) + if err := stream.Send(&google_protobuf.Empty{}); err != nil { + return err + } + } + return nil +} + +func cancelableSleep(ctx context.Context, sleep time.Duration) error { + select { + case <-time.After(sleep): + case <-ctx.Done(): + } + return ctx.Err() +} + // Ensure that http and grpc servers work with no overrides to config // (except http port because an ordinary user can't bind to default port 80) func TestDefaultAddresses(t *testing.T) { @@ -82,7 +106,6 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { RegisterFakeServerServer(server.GRPC, fakeServer) go server.Run() - defer server.Shutdown() conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) defer conn.Close() @@ -109,7 +132,31 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { require.Equal(t, int32(http.StatusPaymentRequired), errResp.Code) require.Equal(t, "402", string(errResp.Body)) + callThenCancel := func(f func(ctx context.Context) error) error { + ctx, cancel := context.WithCancel(context.Background()) + errChan := make(chan error, 1) + go func() { + errChan <- f(ctx) + }() + time.Sleep(50 * time.Millisecond) // allow the call to reach the handler + cancel() + return <-errChan + } + + err = callThenCancel(func(ctx context.Context) error { + _, err = client.Sleep(ctx, &empty) + return err + }) + require.Error(t, err, context.Canceled) + + err = callThenCancel(func(ctx context.Context) error { + _, err = client.StreamSleep(ctx, &empty) + return err + }) + require.NoError(t, err) // canceling a streaming fn doesn't generate an error + conn.Close() + server.Shutdown() metrics, err := prometheus.DefaultGatherer.Gather() require.NoError(t, err) @@ -134,6 +181,8 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { require.Equal(t, map[string]string{ "/server.FakeServer/FailWithError": "error", "/server.FakeServer/FailWithHTTPError": "402", + "/server.FakeServer/Sleep": "error", + "/server.FakeServer/StreamSleep": "error", "/server.FakeServer/Succeed": "success", }, statuses) } From 7df951ef1352b7b71fde0de3f3cca721fe24e0eb Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 16:59:10 +0000 Subject: [PATCH 5/7] Extend cancelation checking to include gRPC errors --- grpc/cancel.go | 20 ++++++++++++++++++++ instrument/instrument.go | 3 ++- middleware/grpc_logging.go | 5 +++-- 3 files changed, 25 insertions(+), 3 deletions(-) create mode 100644 grpc/cancel.go diff --git a/grpc/cancel.go b/grpc/cancel.go new file mode 100644 index 00000000..debef089 --- /dev/null +++ b/grpc/cancel.go @@ -0,0 +1,20 @@ +package grpc + +import ( + "context" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// IsCanceled checks whether an error comes from an operation being canceled +func IsCanceled(err error) bool { + if err == context.Canceled { + return true + } + s, ok := status.FromError(err) + if ok && s.Code() == codes.Canceled { + return true + } + return false +} diff --git a/instrument/instrument.go b/instrument/instrument.go index 2b6168d4..12569100 100644 --- a/instrument/instrument.go +++ b/instrument/instrument.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" oldcontext "golang.org/x/net/context" + "github.com/weaveworks/common/grpc" "github.com/weaveworks/common/user" ) @@ -152,7 +153,7 @@ func CollectedRequest(ctx context.Context, method string, col Collector, toStatu col.After(method, toStatusCode(err), start) if err != nil { - if err != context.Canceled { + if !grpc.IsCanceled(err) { ext.Error.Set(sp, true) } sp.LogFields(otlog.Error(err)) diff --git a/middleware/grpc_logging.go b/middleware/grpc_logging.go index 75328eaf..5a3a393c 100644 --- a/middleware/grpc_logging.go +++ b/middleware/grpc_logging.go @@ -6,6 +6,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + grpcUtils "github.com/weaveworks/common/grpc" "github.com/weaveworks/common/logging" "github.com/weaveworks/common/user" ) @@ -31,7 +32,7 @@ func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface if s.WithRequest { entry = entry.WithField("request", req) } - if err == context.Canceled { + if grpcUtils.IsCanceled(err) { entry.WithField(errorKey, err).Debugln(gRPC) } else { entry.WithField(errorKey, err).Warnln(gRPC) @@ -48,7 +49,7 @@ func (s GRPCServerLog) StreamServerInterceptor(srv interface{}, ss grpc.ServerSt err := handler(srv, ss) entry := user.LogWith(ss.Context(), s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)}) if err != nil { - if err == context.Canceled { + if grpcUtils.IsCanceled(err) { entry.WithField(errorKey, err).Debugln(gRPC) } else { entry.WithField(errorKey, err).Warnln(gRPC) From f3ac1471c64f30070e03cb8569c606e8ad486563 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 17:05:22 +0000 Subject: [PATCH 6/7] Extend error instrumentation tests to http --- server/server_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/server/server_test.go b/server/server_test.go index 1a5cfaed..070ec91d 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -105,6 +105,15 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { fakeServer := FakeServer{} RegisterFakeServerServer(server.GRPC, fakeServer) + server.HTTP.HandleFunc("/succeed", func(w http.ResponseWriter, r *http.Request) { + }) + server.HTTP.HandleFunc("/error500", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + }) + server.HTTP.HandleFunc("/sleep10", func(w http.ResponseWriter, r *http.Request) { + _ = cancelableSleep(r.Context(), time.Second*10) + }) + go server.Run() conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) @@ -155,6 +164,29 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { }) require.NoError(t, err) // canceling a streaming fn doesn't generate an error + // Now test the HTTP versions of the functions + { + req, err := http.NewRequest("GET", "http://127.0.0.1:9090/succeed", nil) + require.NoError(t, err) + _, err = http.DefaultClient.Do(req) + require.NoError(t, err) + } + { + req, err := http.NewRequest("GET", "http://127.0.0.1:9090/error500", nil) + require.NoError(t, err) + _, err = http.DefaultClient.Do(req) + require.NoError(t, err) + } + { + req, err := http.NewRequest("GET", "http://127.0.0.1:9090/sleep10", nil) + require.NoError(t, err) + err = callThenCancel(func(ctx context.Context) error { + _, err = http.DefaultClient.Do(req.WithContext(ctx)) + return err + }) + require.Error(t, err, context.Canceled) + } + conn.Close() server.Shutdown() @@ -184,6 +216,9 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { "/server.FakeServer/Sleep": "error", "/server.FakeServer/StreamSleep": "error", "/server.FakeServer/Succeed": "success", + "error500": "500", + "sleep10": "200", + "succeed": "200", }, statuses) } From 8222cc178545ffbe55e00bd0f1cc591f0abb3b67 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 18 Feb 2020 17:10:32 +0000 Subject: [PATCH 7/7] Instrument canceled gRPC calls with a unique label --- middleware/grpc_instrumentation.go | 3 +++ server/server_test.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/middleware/grpc_instrumentation.go b/middleware/grpc_instrumentation.go index db295eec..58748d5f 100644 --- a/middleware/grpc_instrumentation.go +++ b/middleware/grpc_instrumentation.go @@ -5,6 +5,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + grpcUtils "github.com/weaveworks/common/grpc" "github.com/weaveworks/common/httpgrpc" "golang.org/x/net/context" "google.golang.org/grpc" @@ -15,6 +16,8 @@ func observe(hist *prometheus.HistogramVec, method string, err error, duration t if err != nil { if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { respStatus = strconv.Itoa(int(errResp.Code)) + } else if grpcUtils.IsCanceled(err) { + respStatus = "cancel" } else { respStatus = "error" } diff --git a/server/server_test.go b/server/server_test.go index 070ec91d..eeb686df 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -213,8 +213,8 @@ func TestErrorInstrumentationMiddleware(t *testing.T) { require.Equal(t, map[string]string{ "/server.FakeServer/FailWithError": "error", "/server.FakeServer/FailWithHTTPError": "402", - "/server.FakeServer/Sleep": "error", - "/server.FakeServer/StreamSleep": "error", + "/server.FakeServer/Sleep": "cancel", + "/server.FakeServer/StreamSleep": "cancel", "/server.FakeServer/Succeed": "success", "error500": "500", "sleep10": "200",