From df4036ab73ca75c5b0fdb8c4ed1c4a6cc8eab801 Mon Sep 17 00:00:00 2001 From: "Sahdev P. Zala" Date: Fri, 5 Jan 2018 11:24:50 -0500 Subject: [PATCH] etcdserver/api/v3rpc: debug user cancellation and log warning for rest The context error with cancel code is typically for user cancellation which should be at debug level. For other error codes we should display a warning. Fixes #9085 --- etcdserver/api/v3rpc/lease.go | 12 ++++++++++-- etcdserver/api/v3rpc/util.go | 15 +++++++++++++++ etcdserver/api/v3rpc/watch.go | 24 ++++++++++++++++++++---- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index 91618d115fc..fd27d10c02b 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -92,7 +92,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro return nil } if err != nil { - plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) + if isClientCtxErr(stream.Context().Err(), err) { + plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) + } return err } @@ -118,7 +122,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro resp.TTL = ttl err = stream.Send(resp) if err != nil { - plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(stream.Context().Err(), err) { + plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) + } return err } } diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 8d38d9bd18f..7c86999e4c2 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -21,8 +21,10 @@ import ( "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func togRPCError(err error) error { @@ -101,3 +103,16 @@ func togRPCError(err error) error { return grpc.Errorf(codes.Unknown, err.Error()) } } + +func isClientCtxErr(ctxErr error, err error) bool { + if ctxErr != nil { + return true + } + + ev, ok := status.FromError(err) + if !ok { + return false + } + code := ev.Code() + return code == codes.Canceled || code == codes.DeadlineExceeded +} diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index cd2adf98453..301ffbaf47f 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -141,7 +141,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { // deadlock when calling sws.close(). go func() { if rerr := sws.recvLoop(); rerr != nil { - plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error()) + if isClientCtxErr(stream.Context().Err(), rerr) { + plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error()) + } else { + plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error()) + } errc <- rerr } }() @@ -338,7 +342,11 @@ func (sws *serverWatchStream) sendLoop() { mvcc.ReportEventReceived(len(evs)) if err := sws.gRPCStream.Send(wr); err != nil { - plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(sws.gRPCStream.Context().Err(), err) { + plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error()) + } return } @@ -355,7 +363,11 @@ func (sws *serverWatchStream) sendLoop() { } if err := sws.gRPCStream.Send(c); err != nil { - plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(sws.gRPCStream.Context().Err(), err) { + plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error()) + } return } @@ -371,7 +383,11 @@ func (sws *serverWatchStream) sendLoop() { for _, v := range pending[wid] { mvcc.ReportEventReceived(len(v.Events)) if err := sws.gRPCStream.Send(v); err != nil { - plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error()) + if isClientCtxErr(sws.gRPCStream.Context().Err(), err) { + plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error()) + } else { + plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error()) + } return } }