Skip to content

Commit

Permalink
Added: metrics for CLOSE_WAIT duration, Added: filter synclog read ev…
Browse files Browse the repository at this point in the history
…ents when maintenance (#86)
  • Loading branch information
mjarco authored Apr 24, 2018
1 parent 54441ea commit 0ab81f3
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 56 deletions.
27 changes: 17 additions & 10 deletions httphandler/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
log.Printf("%s", err)
return
}
defer func() {
if resp.Body == nil {
return
}
closeErr := resp.Body.Close()
if closeErr != nil {
log.Printf("Cannot send response body reason: %q",
closeErr.Error())
}
}()
defer respBodyCloserFactory(resp, randomIDStr)()

wh := w.Header()
for k, v := range resp.Header {
Expand All @@ -88,6 +79,22 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

func respBodyCloserFactory(resp *http.Response, randomIDStr string) func() {
return func() {
if resp.Body == nil {
return
}
closeErr := resp.Body.Close()
if resp.Request != nil {
log.Debugf("discard %s response body %s ", resp.Request.URL.Host, randomIDStr)
}
if closeErr != nil {
log.Printf("Cannot send response body reason: %q",
closeErr.Error())
}
}
}

func (h *Handler) validateIncomingRequest(req *http.Request) int {
return config.RequestHeaderContentLengthValidator(*req, h.bodyMaxSize)
}
Expand Down
35 changes: 27 additions & 8 deletions httphandler/response_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package httphandler
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/allegro/akubra/log"
"github.com/allegro/akubra/metrics"
"github.com/allegro/akubra/transport"
"github.com/allegro/akubra/types"
"github.com/allegro/akubra/utils"
set "github.com/deckarep/golang-set"
)
Expand All @@ -27,15 +29,15 @@ func (rd *responseMerger) synclog(r, successfulTup transport.ResErrTuple) {
if !r.Failed {
return
}
if shouldBeFilteredInMaintenanceMode(r) {
return
}
// do not log if there was no successful response
if (successfulTup == transport.ResErrTuple{}) {
return
}
// log error entry
errorMsg := "No error"
if r.Err != nil {
errorMsg = r.Err.Error()
}
errorMsg := emptyStrOrErrorMsg(r.Err)
contentLength := successfulTup.Res.ContentLength
reqID := utils.RequestID(successfulTup.Req)

Expand All @@ -61,6 +63,13 @@ func (rd *responseMerger) synclog(r, successfulTup transport.ResErrTuple) {
rd.syncerrlog.Println(string(logMsg))
}

func emptyStrOrErrorMsg(err error) string {
if err != nil {
return fmt.Sprintf("non nil error:%s", err)
}
return ""
}

func (rd *responseMerger) handleFailedResponces(
tups []transport.ResErrTuple,
out chan<- transport.ResErrTuple,
Expand All @@ -85,6 +94,16 @@ func (rd *responseMerger) handleFailedResponces(
return alreadysent
}

func shouldBeFilteredInMaintenanceMode(failedTup transport.ResErrTuple) bool {
if failedTup.Req.Method == http.MethodPut || failedTup.Req.Method == http.MethodDelete {
return false
}
if backendErr, ok := failedTup.Err.(*types.BackendError); ok && backendErr.OrigErr == types.ErrorBackendMaintenance {
return true
}
return false
}

func logDebug(r transport.ResErrTuple) {
reqID := utils.RequestID(r.Req)
backend := utils.ExtractDestinationHostName(r)
Expand All @@ -93,14 +112,14 @@ func logDebug(r transport.ResErrTuple) {
if r.Res != nil {
statusCode = r.Res.StatusCode
}

log.Debugf("Got response %s from backend %s, status: %d, method: %s, path %s, error: %q",
errMsg := emptyStrOrErrorMsg(r.Err)
log.Debugf("Got response %s from backend %s, status: %d, method: %s, path %s, %s",
reqID,
backend,
statusCode,
r.Req.Method,
r.Req.URL.Path,
r.Err)
errMsg)
}

func (rd *responseMerger) _handle(in <-chan transport.ResErrTuple, out chan<- transport.ResErrTuple) {
Expand Down Expand Up @@ -139,7 +158,7 @@ func (rd *responseMerger) _handle(in <-chan transport.ResErrTuple, out chan<- tr
}

func (rd *responseMerger) handleResponses(in <-chan transport.ResErrTuple) transport.ResErrTuple {
out := make(chan transport.ResErrTuple, 1)
out := make(chan transport.ResErrTuple)
go func() {
rd._handle(in, out)
close(out)
Expand Down
1 change: 0 additions & 1 deletion httphandler/roundtripper_decorators.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ type statusHandler struct {
}

func (sh statusHandler) RoundTrip(req *http.Request) (resp *http.Response, err error) {

if strings.ToLower(req.URL.Path) == sh.healthCheckEndpoint {
resp := &http.Response{}
bodyContent := "OK"
Expand Down
53 changes: 53 additions & 0 deletions httphandler/synclog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package httphandler

import (
"fmt"
"net/http"
"testing"

"github.com/allegro/akubra/transport"
"github.com/allegro/akubra/types"
"github.com/stretchr/testify/require"
)

func TestShouldBeFilteredInMaintenanceMode(t *testing.T) {
getReq, err := http.NewRequest("GET", "", nil)
require.NoError(t, err)
nonMaintenanceModeTuple := transport.ResErrTuple{
Req: getReq,
Err: fmt.Errorf("Non maintenance error"),
}
result := shouldBeFilteredInMaintenanceMode(nonMaintenanceModeTuple)
require.False(t, result)
maintenanceModeTuple := transport.ResErrTuple{
Req: getReq,
Err: &types.BackendError{
OrigErr: types.ErrorBackendMaintenance,
},
}
result = shouldBeFilteredInMaintenanceMode(maintenanceModeTuple)
require.True(t, result)

putReq, err := http.NewRequest("PUT", "", nil)
require.NoError(t, err)
maintenancePutModeTuple := transport.ResErrTuple{
Req: putReq,
Err: &types.BackendError{
OrigErr: types.ErrorBackendMaintenance,
},
}
result = shouldBeFilteredInMaintenanceMode(maintenancePutModeTuple)
require.False(t, result)

deleteReq, err := http.NewRequest("DELETE", "", nil)
require.NoError(t, err)
maintenanceDeleteModeTuple := transport.ResErrTuple{
Req: deleteReq,
Err: &types.BackendError{
OrigErr: types.ErrorBackendMaintenance,
},
}
result = shouldBeFilteredInMaintenanceMode(maintenanceDeleteModeTuple)
require.False(t, result)

}
10 changes: 8 additions & 2 deletions storages/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,19 @@ func (rm *responseMerger) merge(firstTuple transport.ResErrTuple, rtupleCh <-cha
if isSuccess(tuple) {
successes = append(successes, tuple)
} else {
tuple.DiscardBody()
err := tuple.DiscardBody()
if err != nil {
log.Printf("DiscardBody on ignored response tuple error: %s", err)
}
}
}

if len(successes) > 0 {
if !isSuccess(firstTuple) {
firstTuple.DiscardBody()
err := firstTuple.DiscardBody()
if err != nil {
log.Printf("DiscardBody on ignored response tuple error: %s", err)
}
}

res, err := rm.createResponse(firstTuple, successes)
Expand Down
28 changes: 4 additions & 24 deletions storages/storages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,14 @@ import (
"github.com/allegro/akubra/httphandler"
"github.com/allegro/akubra/log"
"github.com/allegro/akubra/transport"
"github.com/allegro/akubra/types"

"github.com/allegro/akubra/storages/auth"
"github.com/allegro/akubra/storages/config"
"github.com/allegro/akubra/storages/merger"
set "github.com/deckarep/golang-set"
)

type backendError struct {
backend string
origErr error
}

func (be *backendError) Backend() string {
return be.backend
}

func (be *backendError) Err() error {
return be.origErr
}

func (be *backendError) Error() (errMsg string) {
errMsg = fmt.Sprintf("backend %s responded with error %s", be.backend, be.origErr)
if _, ok := be.origErr.(*transport.DefinitionError); !ok {
errMsg = fmt.Sprintf("backend %s responded with error %s", be.backend, be.origErr)
}
return
}

// NamedCluster interface
type NamedCluster interface {
http.RoundTripper
Expand Down Expand Up @@ -81,15 +61,15 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
log.Debugf("Request %s req.URL.Host replaced with %s", reqID, req.URL.Host)
if b.Maintenance {
log.Debugf("Request %s blocked %s is in maintenance mode", reqID, req.URL.Host)
return nil, &backendError{backend: b.Endpoint.Host,
origErr: fmt.Errorf("backend %v in maintenance mode", b.Name)}
return nil, &types.BackendError{HostName: b.Endpoint.Host,
OrigErr: types.ErrorBackendMaintenance}
}
err := error(nil)
log.Printf("url host %s, header host %s, req host %s", req.URL.Host, req.Header.Get("Host"), req.Host)
resp, oerror := b.RoundTripper.RoundTrip(req)

if oerror != nil {
err = &backendError{backend: b.Endpoint.Host, origErr: oerror}
err = &types.BackendError{HostName: b.Endpoint.Host, OrigErr: oerror}
}
return resp, err
}
Expand Down
32 changes: 22 additions & 10 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ResErrTuple struct {
Err error
Req *http.Request
Failed bool
Time time.Time
}

// DefinitionError properties for Transports
Expand All @@ -41,25 +42,37 @@ type DefinitionError struct {
func discardReadCloser(rc io.ReadCloser) error {
_, err := io.Copy(ioutil.Discard, rc)
if err != nil {
log.Printf("Discard body error %s", err)
return err
}
err = rc.Close()
if err != nil {
log.Printf("Close body error %s", err)
}
return err
}

// DiscardBody clears request and response body
func (r *ResErrTuple) DiscardBody() {
func (r *ResErrTuple) DiscardBody() error {
ctx := r.Req.Context()
requestID := ctx.Value(log.ContextreqIDKey)
defer log.Printf("Linger duration %f %s", float64(time.Since(r.Time).Nanoseconds())/float64(1000000), requestID)
defer metrics.UpdateSince("reqs.backend."+metrics.Clean(r.Req.URL.Host)+".linger", r.Time)
if r.Req != nil && r.Req.Body != nil {
if err := discardReadCloser(r.Req.Body); err != nil {
log.Printf("Cannot discard request body: %s", err)
return err
}
}

if r.Res != nil && r.Res.Body != nil {
log.Debugf("discard %s response body %s", r.Res.Request.URL.Host, requestID)
if err := discardReadCloser(r.Res.Body); err != nil {
log.Printf("Cannot discard request body: %s", err)
return err
}
}
return nil
}

// MultipleResponsesHandler should handle chan of incomming ReqResErrTuple
Expand Down Expand Up @@ -114,11 +127,8 @@ func defaultHandleResponses(in <-chan ResErrTuple, out chan<- ResErrTuple) {

func clearResponsesBody(respTups []ResErrTuple) {
for _, rtup := range respTups {
if rtup.Res != nil {
_, err := io.Copy(ioutil.Discard, rtup.Res.Body)
if err != nil {
rtup.Err = err
}
if err := rtup.DiscardBody(); err != nil {
log.Printf("ReqRespTup discard body error %s", err)
}
}
}
Expand Down Expand Up @@ -247,17 +257,19 @@ func collectMetrics(req *http.Request, reqresperr ResErrTuple, since time.Time)
func (mt *MultiTransport) sendRequest(
req *http.Request,
out chan ResErrTuple, backend http.RoundTripper) {
since := time.Now()
ctx := req.Context()
o := make(chan ResErrTuple)
requestID := ctx.Value(log.ContextreqIDKey)

since := time.Now()
o := make(chan ResErrTuple)
go func() {
resp, err := backend.RoundTrip(req.WithContext(context.WithValue(context.Background(), log.ContextreqIDKey, requestID)))
if err != nil {
log.Debugf("Send request error %s, %s", err.Error(), requestID)
}
log.Debugf("Sent request %s to %s", requestID, req.URL.Host)
failed := err != nil || resp != nil && (resp.StatusCode < 200 || resp.StatusCode > 399)
r := ResErrTuple{Res: resp, Err: err, Failed: failed}
r := ResErrTuple{Res: resp, Err: err, Failed: failed, Time: time.Now()}
o <- r
}()
var reqresperr ResErrTuple
Expand All @@ -266,7 +278,7 @@ func (mt *MultiTransport) sendRequest(
select {
case <-ctx.Done():
log.Debugf("Ctx Done reqID %s ", requestID)
reqresperr = ResErrTuple{Res: nil, Err: ErrBodyContentLengthMismatch, Failed: true}
reqresperr = ResErrTuple{Res: nil, Err: ErrBodyContentLengthMismatch, Failed: true, Time: time.Now()}
case reqresperr = <-o:
break
}
Expand Down
1 change: 0 additions & 1 deletion transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type backend struct {
// RoundTrip satisfies http.RoundTripper interface
func (b *backend) RoundTrip(r *http.Request) (*http.Response, error) {
r.URL.Host = b.Endpoint.Host
println("Roundtripp", b)
return b.RoundTripper.RoundTrip(r)
}

Expand Down
Loading

0 comments on commit 0ab81f3

Please sign in to comment.