Skip to content

Commit

Permalink
TUN-8238: Refactor proxy logging
Browse files Browse the repository at this point in the history
Propagates the logger context into further locations to help provide more context for certain errors. For instance, upstream and downstream copying errors will properly have the assigned flow id attached and destination address.
  • Loading branch information
DevinCarr committed Feb 16, 2024
1 parent 76badfa commit 971360d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 123 deletions.
78 changes: 78 additions & 0 deletions proxy/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package proxy

import (
"net/http"
"strconv"

"github.com/rs/zerolog"

"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/management"
)

const (
logFieldCFRay = "cfRay"
logFieldLBProbe = "lbProbe"
logFieldRule = "ingressRule"
logFieldOriginService = "originService"
logFieldFlowID = "flowID"
logFieldConnIndex = "connIndex"
logFieldDestAddr = "destAddr"
)

// newHTTPLogger creates a child zerolog.Logger from the provided with added context from the HTTP request, ingress
// services, and connection index.
func newHTTPLogger(logger *zerolog.Logger, connIndex uint8, req *http.Request, rule int, serviceName string) zerolog.Logger {
ctx := logger.With().
Int(management.EventTypeKey, int(management.HTTP)).
Uint8(logFieldConnIndex, connIndex)
cfRay := connection.FindCfRayHeader(req)
lbProbe := connection.IsLBProbeRequest(req)
if cfRay != "" {
ctx.Str(logFieldCFRay, cfRay)
}
if lbProbe {
ctx.Bool(logFieldLBProbe, lbProbe)
}
return ctx.
Str(logFieldOriginService, serviceName).
Interface(logFieldRule, rule).
Logger()
}

// newTCPLogger creates a child zerolog.Logger from the provided with added context from the TCPRequest.
func newTCPLogger(logger *zerolog.Logger, req *connection.TCPRequest) zerolog.Logger {
return logger.With().
Int(management.EventTypeKey, int(management.TCP)).
Uint8(logFieldConnIndex, req.ConnIndex).
Str(logFieldOriginService, ingress.ServiceWarpRouting).
Str(logFieldFlowID, req.FlowID).
Str(logFieldDestAddr, req.Dest).
Uint8(logFieldConnIndex, req.ConnIndex).
Logger()
}

// logHTTPRequest logs a Debug message with the corresponding HTTP request details from the eyeball.
func logHTTPRequest(logger *zerolog.Logger, r *http.Request) {
logger.Debug().
Str("host", r.Host).
Str("path", r.URL.Path).
Interface("headers", r.Header).
Int64("content-length", r.ContentLength).
Msgf("%s %s %s", r.Method, r.URL, r.Proto)
}

// logOriginHTTPResponse logs a Debug message of the origin response.
func logOriginHTTPResponse(logger *zerolog.Logger, resp *http.Response) {
responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()
logger.Debug().
Int64("content-length", resp.ContentLength).
Msgf("%s", resp.Status)
}

// logRequestError logs an error for the proxied request.
func logRequestError(logger *zerolog.Logger, err error) {
requestErrors.Inc()
logger.Error().Err(err).Send()
}
142 changes: 19 additions & 123 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,15 @@ import (
"github.com/cloudflare/cloudflared/cfio"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/management"
"github.com/cloudflare/cloudflared/stream"
"github.com/cloudflare/cloudflared/tracing"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)

const (
// TagHeaderNamePrefix indicates a Cloudflared Warp Tag prefix that gets appended for warp traffic stream headers.
TagHeaderNamePrefix = "Cf-Warp-Tag-"
LogFieldCFRay = "cfRay"
LogFieldLBProbe = "lbProbe"
LogFieldRule = "ingressRule"
LogFieldOriginService = "originService"
LogFieldFlowID = "flowID"
LogFieldConnIndex = "connIndex"
LogFieldDestAddr = "destAddr"

trailerHeaderName = "Trailer"
TagHeaderNamePrefix = "Cf-Warp-Tag-"
trailerHeaderName = "Trailer"
)

// Proxy represents a means to Proxy between cloudflared and the origin services.
Expand Down Expand Up @@ -91,26 +82,18 @@ func (p *Proxy) ProxyHTTP(
defer decrementConcurrentRequests()

req := tr.Request
cfRay := connection.FindCfRayHeader(req)
lbProbe := connection.IsLBProbeRequest(req)
p.appendTagHeaders(req)

_, ruleSpan := tr.Tracer().Start(req.Context(), "ingress_match",
trace.WithAttributes(attribute.String("req-host", req.Host)))
rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
logFields := logFields{
cfRay: cfRay,
lbProbe: lbProbe,
rule: ruleNum,
connIndex: tr.ConnIndex,
}
p.logRequest(req, logFields)
ruleSpan.SetAttributes(attribute.Int("rule-num", ruleNum))
ruleSpan.End()
logger := newHTTPLogger(p.log, tr.ConnIndex, req, ruleNum, rule.Service.String())
logHTTPRequest(&logger, req)
if err, applied := p.applyIngressMiddleware(rule, req, w); err != nil {
if applied {
rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, "", rule, srv)
logRequestError(&logger, err)
return nil
}
return err
Expand All @@ -124,10 +107,9 @@ func (p *Proxy) ProxyHTTP(
originProxy,
isWebsocket,
rule.Config.DisableChunkedEncoding,
logFields,
&logger,
); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, "", rule, srv)
logRequestError(&logger, err)
return err
}
return nil
Expand All @@ -141,14 +123,9 @@ func (p *Proxy) ProxyHTTP(
return fmt.Errorf("response writer is not a flusher")
}
rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
logger := p.log.With().
Int(management.EventTypeKey, int(management.HTTP)).
Str(LogFieldDestAddr, dest).
Uint8(LogFieldConnIndex, tr.ConnIndex).
Logger()
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, logger); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, "", rule, srv)
logger := logger.With().Str(logFieldDestAddr, dest).Logger()
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, &logger); err != nil {
logRequestError(&logger, err)
return err
}
return nil
Expand Down Expand Up @@ -178,19 +155,12 @@ func (p *Proxy) ProxyTCP(
serveCtx, cancel := context.WithCancel(ctx)
defer cancel()

tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log)

logger := p.log.With().
Int(management.EventTypeKey, int(management.TCP)).
Str(LogFieldFlowID, req.FlowID).
Str(LogFieldDestAddr, req.Dest).
Uint8(LogFieldConnIndex, req.ConnIndex).
Logger()

logger := newTCPLogger(p.log, req)
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, &logger)
logger.Debug().Msg("tcp proxy stream started")

if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, logger); err != nil {
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, &logger); err != nil {
logRequestError(&logger, err)
return err
}

Expand All @@ -199,22 +169,14 @@ func (p *Proxy) ProxyTCP(
return nil
}

func ruleField(ing ingress.Ingress, ruleNum int) (ruleID string, srv string) {
srv = ing.Rules[ruleNum].Service.String()
if ing.IsSingleRule() {
return "", srv
}
return fmt.Sprintf("%d", ruleNum), srv
}

// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
func (p *Proxy) proxyHTTPRequest(
w connection.ResponseWriter,
tr *tracing.TracedHTTPRequest,
httpService ingress.HTTPOriginProxy,
isWebsocket bool,
disableChunkedEncoding bool,
fields logFields,
logger *zerolog.Logger,
) error {
roundTripReq := tr.Request
if isWebsocket {
Expand Down Expand Up @@ -281,7 +243,7 @@ func (p *Proxy) proxyHTTPRequest(
reader: tr.Request.Body,
}

stream.Pipe(eyeballStream, rwc, p.log)
stream.Pipe(eyeballStream, rwc, logger)
return nil
}

Expand All @@ -292,7 +254,7 @@ func (p *Proxy) proxyHTTPRequest(
// copy trailers
copyTrailers(w, resp)

p.logOriginResponse(resp, fields)
logOriginHTTPResponse(logger, resp)
return nil
}

Expand All @@ -304,7 +266,7 @@ func (p *Proxy) proxyStream(
rwa connection.ReadWriteAcker,
dest string,
connectionProxy ingress.StreamBasedOriginProxy,
logger zerolog.Logger,
logger *zerolog.Logger,
) error {
ctx := tr.Context
_, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
Expand All @@ -330,7 +292,7 @@ func (p *Proxy) proxyStream(
connectLatency.Observe(float64(time.Since(start).Milliseconds()))
logger.Debug().Msg("proxy stream acknowledged")

originConn.Stream(ctx, rwa, p.log)
originConn.Stream(ctx, rwa, logger)
return nil
}

Expand Down Expand Up @@ -364,14 +326,6 @@ func (p *Proxy) appendTagHeaders(r *http.Request) {
}
}

type logFields struct {
cfRay string
lbProbe bool
rule int
flowID string
connIndex uint8
}

func copyTrailers(w connection.ResponseWriter, response *http.Response) {
for trailerHeader, trailerValues := range response.Trailer {
for _, trailerValue := range trailerValues {
Expand All @@ -380,64 +334,6 @@ func copyTrailers(w connection.ResponseWriter, response *http.Response) {
}
}

func (p *Proxy) logRequest(r *http.Request, fields logFields) {
log := p.log.With().Int(management.EventTypeKey, int(management.HTTP)).Logger()
event := log.Debug()
if fields.cfRay != "" {
event = event.Str(LogFieldCFRay, fields.cfRay)
}
if fields.lbProbe {
event = event.Bool(LogFieldLBProbe, fields.lbProbe)
}
if fields.cfRay == "" && !fields.lbProbe {
log.Debug().Msgf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", r.Method, r.URL, r.Proto)
}
event.
Uint8(LogFieldConnIndex, fields.connIndex).
Str("host", r.Host).
Str("path", r.URL.Path).
Interface(LogFieldRule, fields.rule).
Interface("headers", r.Header).
Int64("content-length", r.ContentLength).
Msgf("%s %s %s", r.Method, r.URL, r.Proto)
}

func (p *Proxy) logOriginResponse(resp *http.Response, fields logFields) {
responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()
event := p.log.Debug()
if fields.cfRay != "" {
event = event.Str(LogFieldCFRay, fields.cfRay)
}
if fields.lbProbe {
event = event.Bool(LogFieldLBProbe, fields.lbProbe)
}
event.
Int(management.EventTypeKey, int(management.HTTP)).
Uint8(LogFieldConnIndex, fields.connIndex).
Int64("content-length", resp.ContentLength).
Msgf("%s", resp.Status)
}

func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, service string) {
requestErrors.Inc()
log := p.log.Error().Err(err)
if cfRay != "" {
log = log.Str(LogFieldCFRay, cfRay)
}
if flowID != "" {
log = log.Str(LogFieldFlowID, flowID).Int(management.EventTypeKey, int(management.TCP))
} else {
log = log.Int(management.EventTypeKey, int(management.HTTP))
}
if rule != "" {
log = log.Str(LogFieldRule, rule)
}
if service != "" {
log = log.Str(LogFieldOriginService, service)
}
log.Send()
}

func getDestFromRule(rule *ingress.Rule, req *http.Request) (string, error) {
switch rule.Service.String() {
case ingress.ServiceBastion:
Expand Down

0 comments on commit 971360d

Please sign in to comment.