Skip to content

Commit

Permalink
Adding logging to tchannel handler error cases
Browse files Browse the repository at this point in the history
Zap logging was added to understand when handler calls failed, when
SendSystemError failed, and when responseWriter failed to close.
To test these changes, I added a responseWriter interface factory to
the struct of the handler so I could control reponseWriter during
testing. A recorder interface was also added to test failure cases
in responseRecorder.
  • Loading branch information
r-hang committed Sep 18, 2018
1 parent 2ec6c29 commit c56bf86
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 100 deletions.
4 changes: 2 additions & 2 deletions transport/tchannel/channel_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (options transportOptions) newChannelTransport() *ChannelTransport {
ch: options.ch,
addr: options.addr,
tracer: options.tracer,
logger: logger,
logger: logger.Named("tchannel"),
originalHeaders: options.originalHeaders,
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func (t *ChannelTransport) start() error {
for s := range services {
sc := t.ch.GetSubChannel(s)
existing := sc.GetHandlers()
sc.SetHandler(handler{existing: existing, router: t.router, tracer: t.tracer})
sc.SetHandler(handler{existing: existing, router: t.router, tracer: t.tracer, logger: t.logger, newResponseWriter: newHandlerWriter})
}
}

Expand Down
123 changes: 72 additions & 51 deletions transport/tchannel/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ type inboundCallResponse interface {
SetApplicationError() error
}

// responseWriter provides an interface similar to handlerWriter.
//
// It allows us to control handlerWriter during testing.
type responseWriter interface {
AddHeaders(h transport.Headers)
AddHeader(key string, value string)
Close() error
IsApplicationError() bool
SetApplicationError()
Write(s []byte) (int, error)
}

// tchannelCall wraps a TChannel InboundCall into an inboundCall.
//
// We need to do this so that we can change the return type of call.Response()
Expand All @@ -82,11 +94,12 @@ func (c tchannelCall) Response() inboundCallResponse {

// handler wraps a transport.UnaryHandler into a TChannel Handler.
type handler struct {
existing map[string]tchannel.Handler
router transport.Router
tracer opentracing.Tracer
headerCase headerCase
logger *zap.Logger
existing map[string]tchannel.Handler
router transport.Router
tracer opentracing.Tracer
headerCase headerCase
logger *zap.Logger
newResponseWriter func(inboundCallResponse, tchannel.Format, headerCase) responseWriter
}

func (h handler) Handle(ctx ncontext.Context, call *tchannel.InboundCall) {
Expand All @@ -95,10 +108,10 @@ func (h handler) Handle(ctx ncontext.Context, call *tchannel.InboundCall) {

func (h handler) handle(ctx context.Context, call inboundCall) {
// you MUST close the responseWriter no matter what unless you have a tchannel.SystemError
responseWriter := newResponseWriter(call.Response(), call.Format(), h.headerCase)
responseWriter := h.newResponseWriter(call.Response(), call.Format(), h.headerCase)

// echo accepted rpc-service in response header
responseWriter.addHeader(ServiceHeaderKey, call.ServiceName())
responseWriter.AddHeader(ServiceHeaderKey, call.ServiceName())

err := h.callHandler(ctx, call, responseWriter)

Expand All @@ -108,33 +121,37 @@ func (h handler) handle(ctx context.Context, call inboundCall) {
call.Response().Blackhole()
return
}
if err != nil && !responseWriter.isApplicationError {
// TODO: log error
_ = call.Response().SendSystemError(getSystemError(err))
if err != nil && !responseWriter.IsApplicationError() {
if err := call.Response().SendSystemError(getSystemError(err)); err != nil {
h.logger.Error("SendSystemError failed", zap.Error(err))
}
h.logger.Error("handler failed", zap.Error(err))
return
}
if err != nil && responseWriter.isApplicationError {
if err != nil && responseWriter.IsApplicationError() {
// we have an error, so we're going to propagate it as a yarpc error,
// regardless of whether or not it is a system error.
status := yarpcerrors.FromError(errors.WrapHandlerError(err, call.ServiceName(), call.MethodString()))
// TODO: what to do with error? we could have a whole complicated scheme to
// return a SystemError here, might want to do that
text, _ := status.Code().MarshalText()
responseWriter.addHeader(ErrorCodeHeaderKey, string(text))
responseWriter.AddHeader(ErrorCodeHeaderKey, string(text))
if status.Name() != "" {
responseWriter.addHeader(ErrorNameHeaderKey, status.Name())
responseWriter.AddHeader(ErrorNameHeaderKey, status.Name())
}
if status.Message() != "" {
responseWriter.addHeader(ErrorMessageHeaderKey, status.Message())
responseWriter.AddHeader(ErrorMessageHeaderKey, status.Message())
}
}
if err := responseWriter.Close(); err != nil {
// TODO: log error
_ = call.Response().SendSystemError(getSystemError(err))
if err := call.Response().SendSystemError(getSystemError(err)); err != nil {
h.logger.Error("SendSystemError failed", zap.Error(err))
}
h.logger.Error("responseWriter failed to close", zap.Error(err))
}
}

func (h handler) callHandler(ctx context.Context, call inboundCall, responseWriter *responseWriter) error {
func (h handler) callHandler(ctx context.Context, call inboundCall, responseWriter responseWriter) error {
start := time.Now()
_, ok := ctx.Deadline()
if !ok {
Expand Down Expand Up @@ -207,80 +224,84 @@ func (h handler) callHandler(ctx context.Context, call inboundCall, responseWrit
}
}

type responseWriter struct {
failedWith error
format tchannel.Format
headers transport.Headers
buffer *bufferpool.Buffer
response inboundCallResponse
isApplicationError bool
headerCase headerCase
type handlerWriter struct {
failedWith error
format tchannel.Format
headers transport.Headers
buffer *bufferpool.Buffer
response inboundCallResponse
applicationError bool
headerCase headerCase
}

func newResponseWriter(response inboundCallResponse, format tchannel.Format, headerCase headerCase) *responseWriter {
return &responseWriter{
func newHandlerWriter(response inboundCallResponse, format tchannel.Format, headerCase headerCase) responseWriter {
return &handlerWriter{
response: response,
format: format,
headerCase: headerCase,
}
}

func (rw *responseWriter) AddHeaders(h transport.Headers) {
func (hw *handlerWriter) AddHeaders(h transport.Headers) {
for k, v := range h.OriginalItems() {
// TODO: is this considered a breaking change?
if isReservedHeaderKey(k) {
rw.failedWith = appendError(rw.failedWith, fmt.Errorf("cannot use reserved header key: %s", k))
hw.failedWith = appendError(hw.failedWith, fmt.Errorf("cannot use reserved header key: %s", k))
return
}
rw.addHeader(k, v)
hw.AddHeader(k, v)
}
}

func (rw *responseWriter) addHeader(key string, value string) {
rw.headers = rw.headers.With(key, value)
func (hw *handlerWriter) AddHeader(key string, value string) {
hw.headers = hw.headers.With(key, value)
}

func (hw *handlerWriter) SetApplicationError() {
hw.applicationError = true
}

func (rw *responseWriter) SetApplicationError() {
rw.isApplicationError = true
func (hw *handlerWriter) IsApplicationError() bool {
return hw.applicationError
}

func (rw *responseWriter) Write(s []byte) (int, error) {
if rw.failedWith != nil {
return 0, rw.failedWith
func (hw *handlerWriter) Write(s []byte) (int, error) {
if hw.failedWith != nil {
return 0, hw.failedWith
}

if rw.buffer == nil {
rw.buffer = bufferpool.Get()
if hw.buffer == nil {
hw.buffer = bufferpool.Get()
}

n, err := rw.buffer.Write(s)
n, err := hw.buffer.Write(s)
if err != nil {
rw.failedWith = appendError(rw.failedWith, err)
hw.failedWith = appendError(hw.failedWith, err)
}
return n, err
}

func (rw *responseWriter) Close() error {
retErr := rw.failedWith
if rw.isApplicationError {
if err := rw.response.SetApplicationError(); err != nil {
func (hw *handlerWriter) Close() error {
retErr := hw.failedWith
if hw.IsApplicationError() {
if err := hw.response.SetApplicationError(); err != nil {
retErr = appendError(retErr, fmt.Errorf("SetApplicationError() failed: %v", err))
}
}

headers := headerMap(rw.headers, rw.headerCase)
retErr = appendError(retErr, writeHeaders(rw.format, headers, nil, rw.response.Arg2Writer))
headers := headerMap(hw.headers, hw.headerCase)
retErr = appendError(retErr, writeHeaders(hw.format, headers, nil, hw.response.Arg2Writer))

// Arg3Writer must be opened and closed regardless of if there is data
// However, if there is a system error, we do not want to do this
bodyWriter, err := rw.response.Arg3Writer()
bodyWriter, err := hw.response.Arg3Writer()
if err != nil {
return appendError(retErr, err)
}
defer func() { retErr = appendError(retErr, bodyWriter.Close()) }()
if rw.buffer != nil {
defer bufferpool.Put(rw.buffer)
if _, err := rw.buffer.WriteTo(bodyWriter); err != nil {
if hw.buffer != nil {
defer bufferpool.Put(hw.buffer)
if _, err := hw.buffer.WriteTo(bodyWriter); err != nil {
return appendError(retErr, err)
}
}
Expand Down
Loading

0 comments on commit c56bf86

Please sign in to comment.