Skip to content

Commit

Permalink
Merge pull request #180 from weaveworks/better_cancel_handling
Browse files Browse the repository at this point in the history
Improve handling of canceled calls in logging, metrics and tracing.
  • Loading branch information
bboreham authored Mar 5, 2020
2 parents 760e36a + 8222cc1 commit 913d088
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 91 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ DOCKER_IMAGE_DIRS=$(patsubst %/Dockerfile,%,$(DOCKERFILES))

all: $(UPTODATE_FILES)

GENERATED_PROTOS=server/fake_server.pb.go httpgrpc/httpgrpc.pb.go

# All the boiler plate for building golang follows:
SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E")
BUILD_IN_CONTAINER := true
Expand All @@ -40,7 +42,7 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \

ifeq ($(BUILD_IN_CONTAINER),true)

lint test shell:
lint test shell protos:
@mkdir -p $(shell pwd)/.pkg
$(SUDO) docker run $(RM) -ti \
-v $(shell pwd)/.pkg:/go/pkg \
Expand All @@ -50,6 +52,11 @@ lint test shell:

else

protos: $(GENERATED_PROTOS)

%.pb.go: %.proto
protoc --go_out=plugins=grpc:../../.. $<

lint:
./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers .

Expand Down
20 changes: 20 additions & 0 deletions grpc/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package grpc

import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// IsCanceled checks whether an error comes from an operation being canceled
func IsCanceled(err error) bool {
if err == context.Canceled {
return true
}
s, ok := status.FromError(err)
if ok && s.Code() == codes.Canceled {
return true
}
return false
}
3 changes: 2 additions & 1 deletion instrument/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
oldcontext "golang.org/x/net/context"

"github.com/weaveworks/common/grpc"
"github.com/weaveworks/common/user"
)

Expand Down Expand Up @@ -152,7 +153,7 @@ func CollectedRequest(ctx context.Context, method string, col Collector, toStatu
col.After(method, toStatusCode(err), start)

if err != nil {
if err != context.Canceled {
if !grpc.IsCanceled(err) {
ext.Error.Set(sp, true)
}
sp.LogFields(otlog.Error(err))
Expand Down
37 changes: 17 additions & 20 deletions middleware/grpc_instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
grpcUtils "github.com/weaveworks/common/grpc"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

func observe(hist *prometheus.HistogramVec, method string, err error, duration time.Duration) {
respStatus := "success"
if err != nil {
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
respStatus = strconv.Itoa(int(errResp.Code))
} else if grpcUtils.IsCanceled(err) {
respStatus = "cancel"
} else {
respStatus = "error"
}
}
hist.WithLabelValues(gRPC, method, respStatus, "false").Observe(duration.Seconds())
}

// UnaryServerInstrumentInterceptor instruments gRPC requests for errors and latency.
func UnaryServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
begin := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(begin).Seconds()
respStatus := "success"
if err != nil {
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
respStatus = strconv.Itoa(int(errResp.Code))
} else {
respStatus = "error"
}
}
hist.WithLabelValues(gRPC, info.FullMethod, respStatus, "false").Observe(duration)
observe(hist, info.FullMethod, err, time.Since(begin))
return resp, err
}
}
Expand All @@ -34,16 +40,7 @@ func StreamServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.Strea
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
begin := time.Now()
err := handler(srv, ss)
duration := time.Since(begin).Seconds()
respStatus := "success"
if err != nil {
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
respStatus = strconv.Itoa(int(errResp.Code))
} else {
respStatus = "error"
}
}
hist.WithLabelValues(gRPC, info.FullMethod, respStatus, "false").Observe(duration)
observe(hist, info.FullMethod, err, time.Since(begin))
return err
}
}
5 changes: 3 additions & 2 deletions middleware/grpc_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"

grpcUtils "github.com/weaveworks/common/grpc"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/user"
)
Expand All @@ -31,7 +32,7 @@ func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface
if s.WithRequest {
entry = entry.WithField("request", req)
}
if err == context.Canceled {
if grpcUtils.IsCanceled(err) {
entry.WithField(errorKey, err).Debugln(gRPC)
} else {
entry.WithField(errorKey, err).Warnln(gRPC)
Expand All @@ -48,7 +49,7 @@ func (s GRPCServerLog) StreamServerInterceptor(srv interface{}, ss grpc.ServerSt
err := handler(srv, ss)
entry := user.LogWith(ss.Context(), s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)})
if err != nil {
if err == context.Canceled {
if grpcUtils.IsCanceled(err) {
entry.WithField(errorKey, err).Debugln(gRPC)
} else {
entry.WithField(errorKey, err).Warnln(gRPC)
Expand Down
Loading

0 comments on commit 913d088

Please sign in to comment.