Skip to content

Commit

Permalink
feat: update logging messages add req_id
Browse files Browse the repository at this point in the history
  • Loading branch information
maier committed Mar 17, 2023
1 parent e21b183 commit de77479
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 70 deletions.
108 changes: 53 additions & 55 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (healthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

type bulkHandler struct {
log logger.Logger
metrics *trapmetrics.TrapMetrics
dataToken string
dest config.Destination
debug bool
}

func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -65,10 +65,6 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

reqID := uuid.New()

handleStart := time.Now()

// extract basic auth credentials
// we're not going to verify them, but they must be present so they can be
// passed upstream and ultimately to opensearch.
Expand All @@ -79,6 +75,10 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

reqID := uuid.New()
reqLogger := log.With().Str("req_id", reqID.String()).Logger()
handleStart := time.Now()

remote := r.Header.Get("X-Forwarded-For")
if remote == "" {
remote = r.RemoteAddr
Expand All @@ -90,12 +90,12 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
contentSize, err := io.Copy(gz, r.Body)
if err != nil {
log.Error().Err(err).Str("req_id", reqID.String()).Msg("compressing body")
reqLogger.Error().Err(err).Msg("compressing body")
http.Error(w, "compressing body", http.StatusInternalServerError)
return
}
if err = gz.Close(); err != nil {
log.Error().Err(err).Str("req_id", reqID.String()).Msg("closing compressed buffer")
reqLogger.Error().Err(err).Msg("closing compressed buffer")
http.Error(w, "closing compressed buffer", http.StatusInternalServerError)
return
}
Expand All @@ -119,6 +119,7 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
MaxIdleConns: 1,
MaxIdleConnsPerHost: 0,
},
Timeout: 60 * time.Second,
}
} else {
destURL.Scheme = "http"
Expand All @@ -135,6 +136,7 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
MaxIdleConns: 1,
MaxIdleConnsPerHost: 0,
},
Timeout: 60 * time.Second,
}
}

Expand All @@ -143,18 +145,24 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

req, err := retryablehttp.NewRequestWithContext(r.Context(), method, destURL.String(), &buf)
if err != nil {
log.Error().Err(err).Str("req_id", reqID.String()).Msg("creating destination request")
reqLogger.Error().Err(err).Msg("creating destination request")
http.Error(w, "creating destination request", http.StatusInternalServerError)
return
}

reqLogger = log.With().
Str("req_id", reqID.String()).
Str("url", req.URL.String()).
Str("method", req.Method).
Logger()

// pass along the basic auth
req.SetBasicAuth(username, password)

req.Header.Set("X-Circonus-Auth-Token", h.dataToken)
req.Header.Set("Content-Type", r.Header.Get("Content-Type"))
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Accept-Encoding", "gzip")
// req.Header.Set("Accept-Encoding", "gzip")
req.Header.Set("Connection", "close")
req.Header.Set("User-Agent", release.NAME+"/"+release.Version)
req.Header.Set("X-Forwarded-For", remote)
Expand All @@ -164,45 +172,33 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

retryClient := retryablehttp.NewClient()
retryClient.HTTPClient = client
retryClient.Logger = h.log
retryClient.Logger = logger.LogWrapper{
Log: reqLogger.With().Str("handler", "/_bulk").Str("component", "retryablehttp").Logger(),
Debug: h.debug,
}
retryClient.RetryWaitMin = 50 * time.Millisecond
retryClient.RetryWaitMax = 2 * time.Second
retryClient.RetryMax = 7
retryClient.RequestLogHook = func(l retryablehttp.Logger, r *http.Request, attempt int) {
if attempt > 0 {
reqStart = time.Now()
log.Info().Str("req_id", reqID.String()).Str("url", r.URL.String()).Int("attempt", attempt).Msg("rettrying")
// l.Printf("retrying... %s %d", r.URL.String(), attempt)
retries++
}
}

retryClient.RequestLogHook = func(l retryablehttp.Logger, r *http.Request, attempt int) {
if attempt > 0 {
reqStart = time.Now()
log.Info().Str("req_id", reqID.String()).Str("url", r.URL.String()).Int("attempt", attempt).Msg("rettrying")
// l.Printf("retrying... %s %d", r.URL.String(), attempt)
reqLogger.Info().Int("attempt", attempt).Msg("retrying")
retries++
}
}

retryClient.ResponseLogHook = func(l retryablehttp.Logger, r *http.Response) {
if r.StatusCode != http.StatusOK {
log.Warn().Str("req_id", reqID.String()).Str("url", r.Request.URL.String()).Int("status_code", r.StatusCode).Str("status", r.Status).Msg("non-200 response")
// l.Printf("%s non-200 response %s: %s", reqID.String(), r.Request.URL.String(), r.Status)
if r.StatusCode == http.StatusNotAcceptable {
l.Printf("broker couldn't parse payload - try tracing metrics for this specific check")
}
reqLogger.Warn().Int("status_code", r.StatusCode).Str("status", r.Status).Msg("non-200 response")
} else if r.StatusCode == http.StatusOK && retries > 0 {
log.Info().Str("req_id", reqID.String()).Str("url", r.Request.URL.String()).Int("retries", retries+1).Msg("succeeded")
// l.Printf("succeeded after %d attempt(s)", retries+1) // add one for first failed attempt
reqLogger.Info().Int("retries", retries+1).Msg("succeeded")
}
}

retryClient.CheckRetry = func(ctx context.Context, resp *http.Response, origErr error) (bool, error) {
retry, rhErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, origErr)
if retry && rhErr != nil {
log.Warn().Err(rhErr).Err(origErr).Str("req_id", reqID.String()).Str("req_url", resp.Request.URL.String()).Msg("request error")
reqLogger.Warn().Err(rhErr).Err(origErr).Msg("request error")
}

return retry, nil
Expand All @@ -216,9 +212,9 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer resp.Body.Close()
}
if err != nil {
log.Error().Err(err).Str("req_id", reqID.String()).Str("req", req.URL.String()).Msg("making destination request")
http.Error(w, "making destination request", http.StatusInternalServerError)
return
reqLogger.Error().Err(err).Msg("making destination request")
// http.Error(w, "making destination request", http.StatusInternalServerError)
// return
}

tags := trapmetrics.Tags{
Expand All @@ -235,7 +231,7 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(resp.StatusCode)
responseSize, err := io.Copy(w, resp.Body)
if err != nil {
log.Error().Err(err).Str("req_id", reqID.String()).Msg("reading/writing response body")
reqLogger.Error().Err(err).Msg("reading/writing response body")
http.Error(w, "reading/writing response", http.StatusInternalServerError)
return
}
Expand All @@ -245,12 +241,9 @@ func (h bulkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ratio = float64(contentSize) / float64(buf.Len())
}

log.Info().
Str("req_id", reqID.String()).
reqLogger.Info().
Str("remote", remote).
Str("proto", r.Proto).
Str("method", r.Method).
Str("URI", r.URL.RequestURI()).
Int("upstream_resp_code", resp.StatusCode).
Str("handle_dur", time.Since(handleStart).String()).
Str("upstream_req_dur", time.Since(reqStart).String()).
Expand Down Expand Up @@ -350,7 +343,6 @@ func (h otelSpanSearchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
}

func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
handleStart := time.Now()

username, ok := r.Context().Value(basicAuthUser).(string)
if !ok {
Expand All @@ -364,6 +356,10 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
return
}

reqID := uuid.New()
reqLogger := log.With().Str("req_id", reqID.String()).Logger()
handleStart := time.Now()

remote := r.Header.Get("X-Forwarded-For")
if remote == "" {
remote = r.RemoteAddr
Expand Down Expand Up @@ -405,6 +401,7 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
MaxIdleConns: 1,
MaxIdleConnsPerHost: 0,
},
Timeout: 60 * time.Second,
}
} else {
newURL = "http://"
Expand All @@ -421,6 +418,7 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
MaxIdleConns: 1,
MaxIdleConnsPerHost: 0,
},
Timeout: 60 * time.Second,
}
}

Expand All @@ -439,14 +437,20 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
return
}

reqLogger = log.With().
Str("req_id", reqID.String()).
Str("url", req.URL.String()).
Str("method", req.Method).
Logger()

// pass along the basic auth
req.SetBasicAuth(username, password)

req.Header.Set("X-Circonus-Auth-Token", s.cfg.Circonus.APIKey)
if r.Method == http.MethodPut || r.Method == http.MethodPost {
req.Header.Set("Content-Type", r.Header.Get("Content-Type"))
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Accept-Encoding", "gzip")
// req.Header.Set("Accept-Encoding", "gzip")
}
req.Header.Set("Connection", "close")
req.Header.Set("User-Agent", release.NAME+"/"+release.Version)
Expand All @@ -458,7 +462,7 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
retryClient := retryablehttp.NewClient()
retryClient.HTTPClient = client
retryClient.Logger = logger.LogWrapper{
Log: log.With().Str("component", "retryablehttp").Logger(),
Log: reqLogger.With().Str("handler", "/_bulk").Str("component", "retryablehttp").Logger(),
Debug: s.cfg.Debug,
}
retryClient.RetryWaitMin = 50 * time.Millisecond
Expand All @@ -467,26 +471,23 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
retryClient.RequestLogHook = func(l retryablehttp.Logger, r *http.Request, attempt int) {
if attempt > 0 {
reqStart = time.Now()
l.Printf("retrying... %s %d", r.URL.String(), attempt)
reqLogger.Info().Int("attempt", attempt).Msg("retrying")
retries++
}
}

retryClient.ResponseLogHook = func(l retryablehttp.Logger, r *http.Response) {
if r.StatusCode != http.StatusOK {
l.Printf("non-200 response %s %s: %s", r.Request.Method, r.Request.URL.String(), r.Status)
if r.StatusCode == http.StatusNotAcceptable {
l.Printf("broker couldn't parse payload - try tracing metrics for this specific check")
}
reqLogger.Warn().Int("status_code", r.StatusCode).Str("status", r.Status).Msg("non-200 response")
} else if r.StatusCode == http.StatusOK && retries > 0 {
l.Printf("succeeded after %d attempt(s)", retries+1) // add one for first failed attempt
reqLogger.Info().Int("retries", retries+1).Msg("succeeded") // add one for first failed attempt
}
}

retryClient.CheckRetry = func(ctx context.Context, resp *http.Response, origErr error) (bool, error) {
retry, rhErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, origErr)
if retry && rhErr != nil {
log.Warn().Err(rhErr).Err(origErr).Str("req_url", resp.Request.URL.String()).Msg("request error")
reqLogger.Warn().Err(rhErr).Err(origErr).Msg("request error")
}

return retry, nil
Expand All @@ -500,8 +501,9 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
defer resp.Body.Close()
}
if err != nil {
s.serverError(w, fmt.Errorf("making destination request (%s): %w", req.URL.String(), err))
return
reqLogger.Error().Err(err).Msg("making destination request")
// s.serverError(w, fmt.Errorf("making destination request (%s): %w", req.URL.String(), err))
// return
}

tags := trapmetrics.Tags{
Expand Down Expand Up @@ -529,11 +531,9 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
return
}

log.Info().
reqLogger.Info().
Str("remote", remote).
Str("proto", r.Proto).
Str("method", r.Method).
Str("URI", r.URL.RequestURI()).
Int("resp_code", resp.StatusCode).
Str("handle_dur", time.Since(handleStart).String()).
Str("upstream_req_dur", time.Since(reqStart).String()).
Expand All @@ -552,11 +552,9 @@ func (s *Server) genericRequest(w http.ResponseWriter, r *http.Request) {
return
}

log.Info().
reqLogger.Info().
Str("remote", remote).
Str("proto", r.Proto).
Str("method", r.Method).
Str("URI", r.URL.RequestURI()).
Int("resp_code", resp.StatusCode).
Str("handle_dur", time.Since(handleStart).String()).
Str("upstream_req_dur", time.Since(reqStart).String()).
Expand Down
23 changes: 8 additions & 15 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/circonus-labs/go-trapmetrics"
"github.com/circonus/c3-exporter/internal/config"
"github.com/circonus/c3-exporter/internal/logger"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -66,13 +65,16 @@ func New(cfg *config.Config) (*Server, error) {
mux.Handle("/", s.verifyBasicAuth(genericHandler{s: s}))
mux.Handle("/health", healthHandler{})
mux.Handle("/_bulk", s.verifyBasicAuth(http.TimeoutHandler(bulkHandler{
dest: cfg.Destination,
log: logger.LogWrapper{
Log: log.With().Str("handler", "/_bulk").Logger(),
Debug: cfg.Debug,
},
dest: cfg.Destination,
dataToken: cfg.Circonus.APIKey,
metrics: metrics,
debug: cfg.Debug,
}, handlerTimeout, "Handler timeout")))
mux.Handle("/otel-v1-apm-span/_bulk", s.verifyBasicAuth(http.TimeoutHandler(bulkHandler{
dest: cfg.Destination,
dataToken: cfg.Circonus.APIKey,
metrics: metrics,
debug: cfg.Debug,
}, handlerTimeout, "Handler timeout")))
mux.Handle("/_cluster/settings", s.verifyBasicAuth(clusterSettingsHandler{s: s}))
mux.Handle("/otel-v1-apm-service-map", s.verifyBasicAuth(otelv1apmservicemapHandler{s: s}))
Expand All @@ -82,15 +84,6 @@ func New(cfg *config.Config) (*Server, error) {
mux.Handle("/_opendistro/_ism/policies/raw-span-policy", s.verifyBasicAuth(ismPolicyHandler{s: s}))
mux.Handle("/otel-v1-apm-span-000001", s.verifyBasicAuth(otelSpanHandler{s: s}))
mux.Handle("/otel-v1-apm-span/_search", s.verifyBasicAuth(otelSpanSearchHandler{s: s}))
mux.Handle("/otel-v1-apm-span/_bulk", s.verifyBasicAuth(http.TimeoutHandler(bulkHandler{
dest: cfg.Destination,
log: logger.LogWrapper{
Log: log.With().Str("handler", "/_bulk").Logger(),
Debug: cfg.Debug,
},
dataToken: cfg.Circonus.APIKey,
metrics: metrics,
}, handlerTimeout, "Handler timeout")))

s.srv = &http.Server{
Addr: cfg.Server.Address,
Expand Down

0 comments on commit de77479

Please sign in to comment.