From dc01734c6bec047181640789ec59a34950fb5ada Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Wed, 8 Aug 2018 16:17:36 -0700 Subject: [PATCH 1/3] etcdserver: add grpc interceptor to log info on incoming requests to etcd server To improve debuggability of etcd v3. Added a grpc interceptor to log info on incoming requests to etcd server. The log output includes remote client info, request content (with value field redacted), request handling latency, response size, etc. Uses zap logger if available, otherwise uses capnslog. Also did some clean up on the chaining of grpc interceptors on server side. --- etcdserver/api/v3rpc/grpc.go | 12 +- etcdserver/api/v3rpc/interceptor.go | 130 +++++++++++++++++- .../etcdserverpb/raft_internal_stringer.go | 6 +- etcdserver/server.go | 4 + 4 files changed, 143 insertions(+), 9 deletions(-) diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 908f0c64a85..c97e7466215 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -40,8 +41,15 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio if tls != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(tls))) } - opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s))) - opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s))) + opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + newLogUnaryInterceptor(s), + newUnaryInterceptor(s), + grpc_prometheus.UnaryServerInterceptor, + ))) + opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + newStreamInterceptor(s), + grpc_prometheus.StreamServerInterceptor, + ))) opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index f38dc4a99cf..1e3a826178a 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -25,9 +25,11 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" - prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" ) const ( @@ -40,7 +42,7 @@ type streamsMap struct { } func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if !api.IsCapabilityEnabled(api.V3rpcCapability) { return nil, rpctypes.ErrGRPCNotCapable } @@ -54,7 +56,126 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } } - return prometheus.UnaryServerInterceptor(ctx, req, info, handler) + resp, err := handler(ctx, req) + return resp, err + } +} + +func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + resp, err := handler(ctx, req) + defer logUnaryRequestStats(ctx, s.Logger(), info, startTime, req, resp) + return resp, err + } +} + +func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) { + duration := time.Since(startTime) + remote := "No remote client info." + peerInfo, ok := peer.FromContext(ctx) + if ok { + remote = peerInfo.Addr.String() + } + var responseType string = info.FullMethod + var reqCount, respCount int64 = 0, 0 + var reqSize, respSize int = 0, 0 + var reqContent string + switch _resp := resp.(type) { + case *pb.RangeResponse: + _req, ok := req.(*pb.RangeRequest) + if ok { + reqCount = 0 + reqSize = _req.Size() + reqContent = _req.String() + } + if _resp != nil { + respCount = _resp.GetCount() + respSize = _resp.Size() + } + case *pb.PutResponse: + _req, ok := req.(*pb.PutRequest) + if ok { + reqCount = 1 + reqSize = _req.Size() + reqContent = pb.NewLoggablePutRequest(_req).String() + // redact value field from request content, see PR #9821 + } + if _resp != nil { + respCount = 0 + respSize = _resp.Size() + } + case *pb.DeleteRangeResponse: + _req, ok := req.(*pb.DeleteRangeRequest) + if ok { + reqCount = 0 + reqSize = _req.Size() + reqContent = _req.String() + } + if _resp != nil { + respCount = _resp.GetDeleted() + respSize = _resp.Size() + } + case *pb.TxnResponse: + _req, ok := req.(*pb.TxnRequest) + if ok && _resp != nil { + if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure + reqCount = int64(len(_req.GetSuccess())) + reqSize = 0 + for _, r := range _req.GetSuccess() { + reqSize += r.Size() + } + } else { + reqCount = int64(len(_req.GetFailure())) + reqSize = 0 + for _, r := range _req.GetFailure() { + reqSize += r.Size() + } + } + reqContent = pb.NewLoggableTxnRequest(_req).String() + // redact value field from request content, see PR #9821 + } + if _resp != nil { + respCount = 0 + respSize = _resp.Size() + } + default: + reqCount = -1 + reqSize = -1 + respCount = -1 + respSize = -1 + } + + logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) +} + +func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, + reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) { + if lg == nil { + plog.Debugf( + "start time = %v, " + + "time spent = %v, " + + "remote = %s, " + + "response type = %s, " + + "request count = %d, " + + "request size = %d, " + + "response count = %d, " + + "response size = %d, " + + "request content = %s", + startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent, + ) + } else { + lg.Debug("request stats", + zap.Time("start time", startTime), + zap.Duration("time spent", duration), + zap.String("remote", remote), + zap.String("response type", responseType), + zap.Int64("request count", reqCount), + zap.Int("request size", reqSize), + zap.Int64("response count", respCount), + zap.Int("response size", respSize), + zap.String("request content", reqContent), + ) } } @@ -90,7 +211,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor } } - return prometheus.StreamServerInterceptor(srv, ss, info, handler) + err := handler(srv, ss) + return err } } diff --git a/etcdserver/etcdserverpb/raft_internal_stringer.go b/etcdserver/etcdserverpb/raft_internal_stringer.go index ec6b6397b39..3d3536a326d 100644 --- a/etcdserver/etcdserverpb/raft_internal_stringer.go +++ b/etcdserver/etcdserverpb/raft_internal_stringer.go @@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string { case as.Request.Put != nil: return fmt.Sprintf("header:<%s> put:<%s>", as.Request.Header.String(), - newLoggablePutRequest(as.Request.Put).String(), + NewLoggablePutRequest(as.Request.Put).String(), ) case as.Request.Txn != nil: return fmt.Sprintf("header:<%s> txn:<%s>", @@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer { func (as *requestOpStringer) String() string { switch op := as.Op.Request.(type) { case *RequestOp_RequestPut: - return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String()) + return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String()) case *RequestOp_RequestTxn: return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String()) default: @@ -167,7 +167,7 @@ type loggablePutRequest struct { IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"` } -func newLoggablePutRequest(request *PutRequest) *loggablePutRequest { +func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest { return &loggablePutRequest{ request.Key, len(request.Value), diff --git a/etcdserver/server.go b/etcdserver/server.go index fce66fb2f6a..0df0354f989 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -2411,3 +2411,7 @@ func (s *EtcdServer) goAttach(f func()) { func (s *EtcdServer) Alarms() []*pb.AlarmMember { return s.alarmStore.Get(pb.AlarmType_NONE) } + +func (s *EtcdServer) Logger() *zap.Logger { + return s.lg +} From 30662940f4cbf2b5c63d877fa5724f809a530b71 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Fri, 10 Aug 2018 11:10:08 -0700 Subject: [PATCH 2/3] vendor: add go-grpc-middleware Rebased to master PR #9994. Fixed a Go format issue in v3rpc/interceptor.go. Updated vendor to include go-grpc-middleware. --- Gopkg.lock | 8 +- bill-of-materials.json | 2 +- etcdserver/api/v3rpc/interceptor.go | 21 +- .../go-grpc-middleware/chain.go | 183 ++++++++++++++++++ .../grpc-ecosystem/go-grpc-middleware/doc.go | 69 +++++++ .../go-grpc-middleware/wrappers.go | 29 +++ 6 files changed, 298 insertions(+), 14 deletions(-) create mode 100644 vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go create mode 100644 vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go create mode 100644 vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go diff --git a/Gopkg.lock b/Gopkg.lock index 876cd9db77f..00f7ff0121d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -129,9 +129,12 @@ revision = "4201258b820c74ac8e6922fc9e6b52f71fe46f8d" [[projects]] - digest = "1:f11ab206621794e7021bbb6d1bb26e82fd12a8893740805db14bdce4b4abe566" + digest = "1:d4fb49314d7f792a14c72f094f8fcb9ecfdf7a4c3e2186efb1f3d3a88806a844" name = "github.com/grpc-ecosystem/go-grpc-middleware" - packages = ["util/backoffutils"] + packages = [ + ".", + "util/backoffutils", + ] pruneopts = "UT" revision = "c250d6563d4d4c20252cd865923440e829844f4e" version = "v1.0.0" @@ -498,6 +501,7 @@ "github.com/golang/groupcache/lru", "github.com/golang/protobuf/proto", "github.com/google/btree", + "github.com/grpc-ecosystem/go-grpc-middleware", "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils", "github.com/grpc-ecosystem/go-grpc-prometheus", "github.com/grpc-ecosystem/grpc-gateway/runtime", diff --git a/bill-of-materials.json b/bill-of-materials.json index c11992df879..acf91d4ea66 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -135,7 +135,7 @@ ] }, { - "project": "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils", + "project": "github.com/grpc-ecosystem/go-grpc-middleware", "licenses": [ { "type": "Apache License 2.0", diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 1e3a826178a..672425389aa 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -78,8 +78,8 @@ func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryS remote = peerInfo.Addr.String() } var responseType string = info.FullMethod - var reqCount, respCount int64 = 0, 0 - var reqSize, respSize int = 0, 0 + var reqCount, respCount int64 + var reqSize, respSize int var reqContent string switch _resp := resp.(type) { case *pb.RangeResponse: @@ -152,15 +152,14 @@ func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryS func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) { if lg == nil { - plog.Debugf( - "start time = %v, " + - "time spent = %v, " + - "remote = %s, " + - "response type = %s, " + - "request count = %d, " + - "request size = %d, " + - "response count = %d, " + - "response size = %d, " + + plog.Debugf("start time = %v, "+ + "time spent = %v, "+ + "remote = %s, "+ + "response type = %s, "+ + "request count = %d, "+ + "request size = %d, "+ + "response count = %d, "+ + "response size = %d, "+ "request content = %s", startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent, ) diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go new file mode 100644 index 00000000000..45a2f5f49a7 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go @@ -0,0 +1,183 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +// gRPC Server Interceptor chaining middleware. + +package grpc_middleware + +import ( + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +// ChainUnaryServer creates a single interceptor out of a chain of many interceptors. +// +// Execution is done in left-to-right order, including passing of context. +// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three +// will see context changes of one and two. +func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor { + n := len(interceptors) + + if n > 1 { + lastI := n - 1 + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + var ( + chainHandler grpc.UnaryHandler + curI int + ) + + chainHandler = func(currentCtx context.Context, currentReq interface{}) (interface{}, error) { + if curI == lastI { + return handler(currentCtx, currentReq) + } + curI++ + resp, err := interceptors[curI](currentCtx, currentReq, info, chainHandler) + curI-- + return resp, err + } + + return interceptors[0](ctx, req, info, chainHandler) + } + } + + if n == 1 { + return interceptors[0] + } + + // n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil. + return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return handler(ctx, req) + } +} + +// ChainStreamServer creates a single interceptor out of a chain of many interceptors. +// +// Execution is done in left-to-right order, including passing of context. +// For example ChainUnaryServer(one, two, three) will execute one before two before three. +// If you want to pass context between interceptors, use WrapServerStream. +func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor { + n := len(interceptors) + + if n > 1 { + lastI := n - 1 + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + var ( + chainHandler grpc.StreamHandler + curI int + ) + + chainHandler = func(currentSrv interface{}, currentStream grpc.ServerStream) error { + if curI == lastI { + return handler(currentSrv, currentStream) + } + curI++ + err := interceptors[curI](currentSrv, currentStream, info, chainHandler) + curI-- + return err + } + + return interceptors[0](srv, stream, info, chainHandler) + } + } + + if n == 1 { + return interceptors[0] + } + + // n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil. + return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, stream) + } +} + +// ChainUnaryClient creates a single interceptor out of a chain of many interceptors. +// +// Execution is done in left-to-right order, including passing of context. +// For example ChainUnaryClient(one, two, three) will execute one before two before three. +func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { + n := len(interceptors) + + if n > 1 { + lastI := n - 1 + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + var ( + chainHandler grpc.UnaryInvoker + curI int + ) + + chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error { + if curI == lastI { + return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...) + } + curI++ + err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...) + curI-- + return err + } + + return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...) + } + } + + if n == 1 { + return interceptors[0] + } + + // n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil. + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +// ChainStreamClient creates a single interceptor out of a chain of many interceptors. +// +// Execution is done in left-to-right order, including passing of context. +// For example ChainStreamClient(one, two, three) will execute one before two before three. +func ChainStreamClient(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor { + n := len(interceptors) + + if n > 1 { + lastI := n - 1 + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + var ( + chainHandler grpc.Streamer + curI int + ) + + chainHandler = func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) { + if curI == lastI { + return streamer(currentCtx, currentDesc, currentConn, currentMethod, currentOpts...) + } + curI++ + stream, err := interceptors[curI](currentCtx, currentDesc, currentConn, currentMethod, chainHandler, currentOpts...) + curI-- + return stream, err + } + + return interceptors[0](ctx, desc, cc, method, chainHandler, opts...) + } + } + + if n == 1 { + return interceptors[0] + } + + // n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil. + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(ctx, desc, cc, method, opts...) + } +} + +// Chain creates a single interceptor out of a chain of many interceptors. +// +// WithUnaryServerChain is a grpc.Server config option that accepts multiple unary interceptors. +// Basically syntactic sugar. +func WithUnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption { + return grpc.UnaryInterceptor(ChainUnaryServer(interceptors...)) +} + +// WithStreamServerChain is a grpc.Server config option that accepts multiple stream interceptors. +// Basically syntactic sugar. +func WithStreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption { + return grpc.StreamInterceptor(ChainStreamServer(interceptors...)) +} diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go new file mode 100644 index 00000000000..71689503642 --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go @@ -0,0 +1,69 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +/* +`grpc_middleware` is a collection of gRPC middleware packages: interceptors, helpers and tools. + +Middleware + +gRPC is a fantastic RPC middleware, which sees a lot of adoption in the Golang world. However, the +upstream gRPC codebase is relatively bare bones. + +This package, and most of its child packages provides commonly needed middleware for gRPC: +client-side interceptors for retires, server-side interceptors for input validation and auth, +functions for chaining said interceptors, metadata convenience methods and more. + +Chaining + +By default, gRPC doesn't allow one to have more than one interceptor either on the client nor on +the server side. `grpc_middleware` provides convenient chaining methods + +Simple way of turning a multiple interceptors into a single interceptor. Here's an example for +server chaining: + + myServer := grpc.NewServer( + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream, monitoringStream, authStream)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary, monitoringUnary, authUnary), + ) + +These interceptors will be executed from left to right: logging, monitoring and auth. + +Here's an example for client side chaining: + + clientConn, err = grpc.Dial( + address, + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)), + ) + client = pb_testproto.NewTestServiceClient(clientConn) + resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"}) + +These interceptors will be executed from left to right: monitoring and then retry logic. + +The retry interceptor will call every interceptor that follows it whenever when a retry happens. + +Writing Your Own + +Implementing your own interceptor is pretty trivial: there are interfaces for that. But the interesting +bit exposing common data to handlers (and other middleware), similarly to HTTP Middleware design. +For example, you may want to pass the identity of the caller from the auth interceptor all the way +to the handling function. + +For example, a client side interceptor example for auth looks like: + + func FakeAuthUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + newCtx := context.WithValue(ctx, "user_id", "john@example.com") + return handler(newCtx, req) + } + +Unfortunately, it's not as easy for streaming RPCs. These have the `context.Context` embedded within +the `grpc.ServerStream` object. To pass values through context, a wrapper (`WrappedServerStream`) is +needed. For example: + + func FakeAuthStreamingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + newStream := grpc_middleware.WrapServerStream(stream) + newStream.WrappedContext = context.WithValue(ctx, "user_id", "john@example.com") + return handler(srv, stream) + } +*/ +package grpc_middleware diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go new file mode 100644 index 00000000000..597b862445f --- /dev/null +++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go @@ -0,0 +1,29 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package grpc_middleware + +import ( + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +// WrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context. +type WrappedServerStream struct { + grpc.ServerStream + // WrappedContext is the wrapper's own Context. You can assign it. + WrappedContext context.Context +} + +// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context() +func (w *WrappedServerStream) Context() context.Context { + return w.WrappedContext +} + +// WrapServerStream returns a ServerStream that has the ability to overwrite context. +func WrapServerStream(stream grpc.ServerStream) *WrappedServerStream { + if existing, ok := stream.(*WrappedServerStream); ok { + return existing + } + return &WrappedServerStream{ServerStream: stream, WrappedContext: stream.Context()} +} From 368010d8a34d01cef92ead2324adc866507f9add Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Fri, 10 Aug 2018 16:29:42 -0700 Subject: [PATCH 3/3] etcdserver: code clean up Code clean up in interceptor.go --- etcdserver/api/v3rpc/interceptor.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 672425389aa..5265051ce52 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -56,8 +56,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } } - resp, err := handler(ctx, req) - return resp, err + return handler(ctx, req) } } @@ -210,8 +209,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor } } - err := handler(srv, ss) - return err + return handler(srv, ss) } }