From b599ab7adc17afa7efd00a1f871b83243cc168d7 Mon Sep 17 00:00:00 2001 From: Michal Jarco Date: Fri, 3 Feb 2017 09:41:56 +0100 Subject: [PATCH 1/3] Wait for all backends if performing bucket operation --- httphandler/httphandler.go | 11 --- httphandler/response_merger.go | 69 +++++++++++---- sharding/sharding.go | 154 ++++----------------------------- sharding/sharding_test.go | 3 +- sharding/shards_ring.go | 136 +++++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 169 deletions(-) create mode 100644 sharding/shards_ring.go diff --git a/httphandler/httphandler.go b/httphandler/httphandler.go index 35dda2e..ecf4f95 100644 --- a/httphandler/httphandler.go +++ b/httphandler/httphandler.go @@ -9,7 +9,6 @@ import ( "github.com/allegro/akubra/config" "github.com/allegro/akubra/dial" "github.com/allegro/akubra/log" - "github.com/allegro/akubra/transport" ) // Handler implements http.Handler interface @@ -77,16 +76,6 @@ func ConfigureHTTPTransport(conf config.Config) (*http.Transport, error) { return httpTransport, nil } -// NewMultipleResponseHandler returns a function for a later use in transport.MultiTransport -func NewMultipleResponseHandler(conf config.Config) transport.MultipleResponsesHandler { - rh := responseMerger{ - conf.Synclog, - conf.Mainlog, - conf.SyncLogMethodsSet, - } - return rh.handleResponses -} - // DecorateRoundTripper applies common http.RoundTripper decorators func DecorateRoundTripper(conf config.Config, rt http.RoundTripper) http.RoundTripper { return Decorate( diff --git a/httphandler/response_merger.go b/httphandler/response_merger.go index 819c258..7c332da 100644 --- a/httphandler/response_merger.go +++ b/httphandler/response_merger.go @@ -5,6 +5,7 @@ import ( "io" "io/ioutil" + "github.com/allegro/akubra/config" "github.com/allegro/akubra/log" "github.com/allegro/akubra/transport" set "github.com/deckarep/golang-set" @@ -14,6 +15,7 @@ type responseMerger struct { syncerrlog log.Logger runtimeLog log.Logger methodSetFilter set.Set + fifo bool } func (rd *responseMerger) synclog(r, successfulTup *transport.ReqResErrTuple) { @@ -56,17 +58,6 @@ func (rd *responseMerger) handleFailedResponces( logMethodSet set.Set) bool { for _, r := range tups { - errorMsg := "No error" - if r.Err != nil { - errorMsg = r.Err.Error() - } - - rd.runtimeLog.Printf("RGW resp %q, %q, %q, %t, %q", - r.Req.URL.Path, - r.Req.Method, - r.Req.Host, - r.Failed, - errorMsg) rd.synclog(r, successfulTup) @@ -91,19 +82,34 @@ func (rd *responseMerger) _handle(in <-chan *transport.ReqResErrTuple, out chan< var successfulTup *transport.ReqResErrTuple errs := []*transport.ReqResErrTuple{} nonErrs := []*transport.ReqResErrTuple{} - respPassed := false + firstPassed := false for { r, hasMore := <-in if !hasMore { break } + + errorMsg := "No error" + if r.Err != nil { + errorMsg = r.Err.Error() + } + + rd.runtimeLog.Printf("RGW resp %q, %q, %q, %t, %q", + r.Req.URL.Path, + r.Req.Method, + r.Req.Host, + r.Failed, + errorMsg) + // pass first successful answer to client - if !r.Failed && !respPassed { + if !r.Failed && !firstPassed { // append additional headers successfulTup = r - out <- r - respPassed = true + if rd.fifo { + out <- r + } + firstPassed = true continue } if r.Err != nil { @@ -113,8 +119,12 @@ func (rd *responseMerger) _handle(in <-chan *transport.ReqResErrTuple, out chan< } } - respPassed = rd.handleFailedResponces(nonErrs, out, respPassed, successfulTup, rd.methodSetFilter) - rd.handleFailedResponces(errs, out, respPassed, successfulTup, rd.methodSetFilter) + if !rd.fifo && firstPassed { + out <- successfulTup + } + + firstPassed = rd.handleFailedResponces(nonErrs, out, firstPassed, successfulTup, rd.methodSetFilter) + rd.handleFailedResponces(errs, out, firstPassed, successfulTup, rd.methodSetFilter) } func (rd *responseMerger) handleResponses(in <-chan *transport.ReqResErrTuple) *transport.ReqResErrTuple { @@ -125,3 +135,28 @@ func (rd *responseMerger) handleResponses(in <-chan *transport.ReqResErrTuple) * }() return <-out } + +// EarliestResponseHandler returns a function which handles multiple +// responses, returns first successful response to caller +func EarliestResponseHandler(conf config.Config) transport.MultipleResponsesHandler { + rh := responseMerger{ + conf.Synclog, + conf.Mainlog, + conf.SyncLogMethodsSet, + true, + } + return rh.handleResponses +} + +// LateResponseHandler returns a function which handles multiple +// responses and returns first successful response to caller after +// all other responces received +func LateResponseHandler(conf config.Config) transport.MultipleResponsesHandler { + rh := responseMerger{ + conf.Synclog, + conf.Mainlog, + conf.SyncLogMethodsSet, + false, + } + return rh.handleResponses +} diff --git a/sharding/sharding.go b/sharding/sharding.go index 7b2c88e..5f03c4d 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -1,17 +1,13 @@ package sharding import ( - "bytes" - "encoding/json" "fmt" - "io" "net/http" "net/url" "strings" "github.com/allegro/akubra/config" "github.com/allegro/akubra/httphandler" - "github.com/allegro/akubra/log" "github.com/allegro/akubra/transport" "github.com/golang/groupcache/consistenthash" ) @@ -23,128 +19,6 @@ type cluster struct { name string } -type shardsRing struct { - ring *consistenthash.Map - shardClusterMap map[string]cluster - allClustersRoundTripper http.RoundTripper - clusterRegressionMap map[string]cluster - inconsistencyLog log.Logger -} - -func (sr shardsRing) isBucketPath(path string) bool { - trimmedPath := strings.Trim(path, "/") - return len(strings.Split(trimmedPath, "/")) == 1 -} - -func (sr shardsRing) Pick(key string) (cluster, error) { - var shardName string - - shardName = sr.ring.Get(key) - shardCluster, ok := sr.shardClusterMap[shardName] - if !ok { - return cluster{}, fmt.Errorf("no cluster for shard %s, cannot handle key %s", shardName, key) - } - - return shardCluster, nil -} - -type reqBody struct { - r *bytes.Reader -} - -func (rb *reqBody) rewind() error { - _, err := rb.r.Seek(0, io.SeekStart) - return err -} - -func (rb *reqBody) Read(b []byte) (int, error) { - return rb.r.Read(b) -} - -func (rb *reqBody) Close() error { - return nil -} - -func copyRequest(origReq *http.Request) (*http.Request, error) { - newReq := new(http.Request) - *newReq = *origReq - newReq.URL = &url.URL{} - *newReq.URL = *origReq.URL - newReq.Header = http.Header{} - for k, v := range origReq.Header { - for _, vv := range v { - newReq.Header.Add(k, vv) - } - } - if origReq.Body != nil { - buf := new(bytes.Buffer) - _, err := io.Copy(buf, origReq.Body) - if err != nil { - return nil, err - } - newReq.Body = &reqBody{bytes.NewReader(buf.Bytes())} - } - return newReq, nil -} - -func (sr shardsRing) send(roundTripper http.RoundTripper, req *http.Request) (*http.Response, error) { - // Rewind request body - bodySeeker, ok := req.Body.(*reqBody) - if ok { - err := bodySeeker.rewind() - if err != nil { - return nil, err - } - } - return roundTripper.RoundTrip(req) -} - -func (sr shardsRing) regressionCall(cl cluster, req *http.Request) (string, *http.Response, error) { - resp, err := sr.send(cl, req) - // Do regression call if response status is > 400 - if (err != nil || resp.StatusCode > 400) && req.Method != http.MethodPut { - rcl, ok := sr.clusterRegressionMap[cl.name] - if ok { - return sr.regressionCall(rcl, req) - } - } - return cl.name, resp, err -} -func (sr *shardsRing) logInconsistency(key, expectedClusterName, actualClusterName string) { - logJSON, err := json.Marshal( - struct { - Key string - Expected string - Actual string - }{key, expectedClusterName, actualClusterName}) - if err == nil { - sr.inconsistencyLog.Printf(fmt.Sprintf("%s", logJSON)) - } -} - -func (sr shardsRing) RoundTrip(req *http.Request) (*http.Response, error) { - reqCopy, err := copyRequest(req) - if err != nil { - return nil, err - } - - if reqCopy.Method == http.MethodDelete || sr.isBucketPath(reqCopy.URL.Path) { - return sr.allClustersRoundTripper.RoundTrip(reqCopy) - } - - cl, err := sr.Pick(reqCopy.URL.Path) - if err != nil { - return nil, err - } - - clusterName, resp, err := sr.regressionCall(cl, reqCopy) - if clusterName != cl.name { - sr.logInconsistency(reqCopy.URL.Path, cl.name, clusterName) - } - - return resp, err -} - func newMultiBackendCluster(transp http.RoundTripper, multiResponseHandler transport.MultipleResponsesHandler, clusterConf config.ClusterConfig, name string) cluster { @@ -168,10 +42,9 @@ func newMultiBackendCluster(transp http.RoundTripper, } type ringFactory struct { - conf config.Config - transport http.RoundTripper - multipleResponseHandler transport.MultipleResponsesHandler - clusters map[string]cluster + conf config.Config + transport http.RoundTripper + clusters map[string]cluster } func (rf ringFactory) initCluster(name string) (cluster, error) { @@ -179,7 +52,8 @@ func (rf ringFactory) initCluster(name string) (cluster, error) { if !ok { return cluster{}, fmt.Errorf("no cluster %q in configuration", name) } - return newMultiBackendCluster(rf.transport, rf.multipleResponseHandler, clusterConf, name), nil + respHandler := httphandler.EarliestResponseHandler(rf.conf) + return newMultiBackendCluster(rf.transport, respHandler, clusterConf, name), nil } func (rf ringFactory) getCluster(name string) (cluster, error) { @@ -285,10 +159,13 @@ func (rf ringFactory) clientRing(clientCfg config.ClientConfig) (shardsRing, err if err != nil { return shardsRing{}, err } + + respHandler := httphandler.LateResponseHandler(rf.conf) + allBackendsRoundTripper := transport.NewMultiTransport( rf.transport, allBackendsSlice, - rf.multipleResponseHandler) + respHandler) regressionMap, err := rf.createRegressionMap(clientCfg.Clusters) if err != nil { return shardsRing{}, nil @@ -296,12 +173,11 @@ func (rf ringFactory) clientRing(clientCfg config.ClientConfig) (shardsRing, err return shardsRing{cHashMap, shardMap, allBackendsRoundTripper, regressionMap, rf.conf.ClusterSyncLog}, nil } -func newRingFactory(conf config.Config, transport http.RoundTripper, respHandler transport.MultipleResponsesHandler) ringFactory { +func newRingFactory(conf config.Config, transport http.RoundTripper) ringFactory { return ringFactory{ - conf: conf, - transport: transport, - multipleResponseHandler: respHandler, - clusters: make(map[string]cluster), + conf: conf, + transport: transport, + clusters: make(map[string]cluster), } } @@ -318,8 +194,8 @@ func NewHandler(conf config.Config) (http.Handler, error) { if err != nil { return nil, err } - respHandler := httphandler.NewMultipleResponseHandler(conf) - rings := newRingFactory(conf, httptransp, respHandler) + + rings := newRingFactory(conf, httptransp) // TODO: Multiple clients ring, err := rings.clientRing(*conf.Client) if err != nil { diff --git a/sharding/sharding_test.go b/sharding/sharding_test.go index fae9ec3..be39ca8 100644 --- a/sharding/sharding_test.go +++ b/sharding/sharding_test.go @@ -98,11 +98,10 @@ func makeRingFactory(conf config.Config) (ringFactory, error) { if err != nil { return ringFactory{}, err } - respHandler := httphandler.NewMultipleResponseHandler(conf) if err != nil { return ringFactory{}, err } - return newRingFactory(conf, httptransp, respHandler), nil + return newRingFactory(conf, httptransp), nil } func TestSingleClusterOnRing(t *testing.T) { diff --git a/sharding/shards_ring.go b/sharding/shards_ring.go new file mode 100644 index 0000000..3db9161 --- /dev/null +++ b/sharding/shards_ring.go @@ -0,0 +1,136 @@ +package sharding + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/allegro/akubra/log" + "github.com/golang/groupcache/consistenthash" +) + +type shardsRing struct { + ring *consistenthash.Map + shardClusterMap map[string]cluster + allClustersRoundTripper http.RoundTripper + clusterRegressionMap map[string]cluster + inconsistencyLog log.Logger +} + +func (sr shardsRing) isBucketPath(path string) bool { + trimmedPath := strings.Trim(path, "/") + return len(strings.Split(trimmedPath, "/")) == 1 +} + +func (sr shardsRing) Pick(key string) (cluster, error) { + var shardName string + + shardName = sr.ring.Get(key) + shardCluster, ok := sr.shardClusterMap[shardName] + if !ok { + return cluster{}, fmt.Errorf("no cluster for shard %s, cannot handle key %s", shardName, key) + } + + return shardCluster, nil +} + +type reqBody struct { + r *bytes.Reader +} + +func (rb *reqBody) rewind() error { + _, err := rb.r.Seek(0, io.SeekStart) + return err +} + +func (rb *reqBody) Read(b []byte) (int, error) { + return rb.r.Read(b) +} + +func (rb *reqBody) Close() error { + return nil +} + +func copyRequest(origReq *http.Request) (*http.Request, error) { + newReq := new(http.Request) + *newReq = *origReq + newReq.URL = &url.URL{} + *newReq.URL = *origReq.URL + newReq.Header = http.Header{} + for k, v := range origReq.Header { + for _, vv := range v { + newReq.Header.Add(k, vv) + } + } + if origReq.Body != nil { + buf := new(bytes.Buffer) + _, err := io.Copy(buf, origReq.Body) + if err != nil { + return nil, err + } + newReq.Body = &reqBody{bytes.NewReader(buf.Bytes())} + } + return newReq, nil +} + +func (sr shardsRing) send(roundTripper http.RoundTripper, req *http.Request) (*http.Response, error) { + // Rewind request body + bodySeeker, ok := req.Body.(*reqBody) + if ok { + err := bodySeeker.rewind() + if err != nil { + return nil, err + } + } + return roundTripper.RoundTrip(req) +} + +func (sr shardsRing) regressionCall(cl cluster, req *http.Request) (string, *http.Response, error) { + resp, err := sr.send(cl, req) + // Do regression call if response status is > 400 + if (err != nil || resp.StatusCode > 400) && req.Method != http.MethodPut { + rcl, ok := sr.clusterRegressionMap[cl.name] + if ok { + return sr.regressionCall(rcl, req) + } + } + return cl.name, resp, err +} +func (sr *shardsRing) logInconsistency(key, expectedClusterName, actualClusterName string) { + logJSON, err := json.Marshal( + struct { + Key string + Expected string + Actual string + }{key, expectedClusterName, actualClusterName}) + if err == nil { + sr.inconsistencyLog.Printf(fmt.Sprintf("%s", logJSON)) + } +} + +func (sr shardsRing) RoundTrip(req *http.Request) (*http.Response, error) { + reqCopy, err := copyRequest(req) + if err != nil { + return nil, err + } + + if reqCopy.Method == http.MethodDelete || sr.isBucketPath(reqCopy.URL.Path) { + return sr.allClustersRoundTripper.RoundTrip(reqCopy) + } + + cl, err := sr.Pick(reqCopy.URL.Path) + if err != nil { + return nil, err + } + + clusterName, resp, err := sr.regressionCall(cl, reqCopy) + if clusterName != cl.name { + sr.logInconsistency(reqCopy.URL.Path, cl.name, clusterName) + } + + return resp, err +} From c979e46ce22c29b4d1d6a89fc89966d40994cfdc Mon Sep 17 00:00:00 2001 From: Michal Jarco Date: Tue, 7 Feb 2017 14:38:43 +0100 Subject: [PATCH 2/3] Replaced custom dialer with build in one --- dial/dial.go | 140 ------------------------- dial/dial_test.go | 124 ---------------------- httphandler/httphandler.go | 35 ++++--- httphandler/log.go | 12 ++- httphandler/response_merger.go | 23 ++-- httphandler/roundtripper_decorators.go | 2 +- log/logger.go | 9 +- main.go | 1 + sharding/sharding.go | 22 +++- transport/transport.go | 30 +++++- transport/transport_test.go | 34 +----- 11 files changed, 99 insertions(+), 333 deletions(-) delete mode 100644 dial/dial.go delete mode 100644 dial/dial_test.go diff --git a/dial/dial.go b/dial/dial.go deleted file mode 100644 index c707040..0000000 --- a/dial/dial.go +++ /dev/null @@ -1,140 +0,0 @@ -package dial - -import ( - "fmt" - "net" - "net/url" - "sync" - "time" -) - -type watchConn struct { - net.Conn - closeCallback func(net.Conn, error) -} - -func (wc *watchConn) Close() error { - err := wc.Conn.Close() - if wc.closeCallback != nil { - wc.closeCallback(wc.Conn, err) - } - return err -} - -// LimitDialer limits open connections by read and dial timeout. Also provides hard -// limit on number of open connections -type LimitDialer struct { - activeCons map[string]int64 - limit int64 - dialTimeout time.Duration - readTimeout time.Duration - droppedEndpoints map[string]bool - countersMx sync.Mutex -} - -// ErrSlowOrMaintained is returned if LimitDialer exceeds connection limit -var ErrSlowOrMaintained = fmt.Errorf("Slow or maintained endpoint") - -func (d *LimitDialer) incrementCount(addr string) (int64, error) { - d.countersMx.Lock() - defer d.countersMx.Unlock() - _, ok := d.activeCons[addr] - if !ok { - d.activeCons[addr] = 0 - } - - d.activeCons[addr]++ - if d.limitReached(addr) { - d.activeCons[addr]-- - return d.activeCons[addr], ErrSlowOrMaintained - } - - return d.activeCons[addr], nil -} - -func (d *LimitDialer) decrementCount(addr string) { - d.countersMx.Lock() - defer d.countersMx.Unlock() - d.activeCons[addr]-- -} - -// checks if limit is reached and if given endpoint is most occupied -func (d *LimitDialer) limitReached(endpoint string) bool { - numOfAllConns := int64(0) - maxNumOfEndpointConns := int64(0) - mostLoadedEndpoint := "" - if d.droppedEndpoints[endpoint] { - return true - } - for key, count := range d.activeCons { - numOfAllConns += count - if count > maxNumOfEndpointConns { - maxNumOfEndpointConns = count - mostLoadedEndpoint = key - } - } - if numOfAllConns > d.limit { - return mostLoadedEndpoint == endpoint - } - return false -} - -// Dial connects to endpoint as net.Dial does, but also keeps track -// on number of connections -func (d *LimitDialer) Dial(network, addr string) (c net.Conn, err error) { - _, incErr := d.incrementCount(addr) - if incErr != nil { - return nil, incErr - } - - var netconn net.Conn - - if d.dialTimeout > 0 { - netconn, err = net.DialTimeout(network, addr, d.dialTimeout) - } else { - netconn, err = net.Dial(network, addr) - } - - if err != nil { - d.decrementCount(addr) - return nil, err - } - - if d.readTimeout > 0 { - deadlineErr := netconn.SetDeadline(time.Now().Add(d.readTimeout)) - if deadlineErr != nil { - d.decrementCount(addr) - closeErr := netconn.Close() - if closeErr != nil { - return nil, fmt.Errorf("%s error during: %s", closeErr, deadlineErr) - } - return nil, deadlineErr - } - } - - c = &watchConn{netconn, func(c net.Conn, e error) { - d.decrementCount(addr) - }} - - return c, err -} - -func (d *LimitDialer) dropEndpoints(droppedEndpoints []url.URL) { - endpointsMap := make(map[string]bool, len(droppedEndpoints)) - for _, URL := range droppedEndpoints { - endpointsMap[URL.Host] = true - } - d.droppedEndpoints = endpointsMap -} - -// NewLimitDialer returns new `LimitDialer`. -func NewLimitDialer(limit int64, readTimeout, dialTimeout time.Duration, droppedEndpoints []url.URL) *LimitDialer { - dialer := &LimitDialer{ - activeCons: make(map[string]int64), - limit: limit, - dialTimeout: dialTimeout, - readTimeout: readTimeout, - } - dialer.dropEndpoints(droppedEndpoints) - return dialer -} diff --git a/dial/dial_test.go b/dial/dial_test.go deleted file mode 100644 index ffeb6a8..0000000 --- a/dial/dial_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package dial - -import ( - "fmt" - "net" - "net/url" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestLimitDialer(t *testing.T) { - addr := "198.18.0.254:80" - timeout := 10 * time.Millisecond - dialer := NewLimitDialer(0, timeout, timeout, nil) - conn, err := dialer.Dial("tcp", addr) - assert.NotNil(t, err, "") - if !assert.Nil(t, conn) { - defer func() { - err := conn.Close() - assert.Nil(t, err) - }() - } -} - -func autoListener(t *testing.T) (net.Listener, string) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err, "Should never fail") - return listener, listener.Addr().String() -} - -func TestLimitDialerMostLoadedEndpoint(t *testing.T) { - timeout := time.Second - - l1, addr1 := autoListener(t) - if l1 != nil { - defer func() { - err := l1.Close() - assert.NoError(t, err) - }() - } - - l2, addr2 := autoListener(t) - if l2 != nil { - defer func() { - err := l2.Close() - assert.NoError(t, err) - }() - } - - dialer := NewLimitDialer(2, timeout, timeout, nil) - conn1, c1Err := dialer.Dial("tcp", addr1) - if assert.NotNil(t, conn1) { - defer func() { - err := conn1.Close() - assert.NoError(t, err) - }() - } - assert.NoError(t, c1Err) - conn2, c2Err := dialer.Dial("tcp", addr2) - if assert.NotNil(t, conn2) { - defer func() { - err := conn2.Close() - assert.NoError(t, err) - }() - } - assert.NoError(t, c2Err) - - conn3, c3Err := dialer.Dial("tcp", addr2) - if !assert.Nil(t, conn3) { - defer func() { - err := conn3.Close() - assert.NoError(t, err) - }() - } - assert.Error(t, c3Err, "addr2 host should exceed connections limit") - -} - -func TestLimitDialerConcurrency(t *testing.T) { - l, addr := autoListener(t) - if l != nil { - defer func() { - err := l.Close() - assert.Nil(t, err) - }() - } - timeout := time.Second - dialer := NewLimitDialer(4, timeout, timeout, nil) - gotErr := make(chan bool) - for i := 0; i < 5; i++ { - go func() { - _, err := dialer.Dial("tcp", addr) - if err != nil { - gotErr <- true - } - }() - } - select { - case e := <-gotErr: - assert.True(t, e) - case <-time.After(timeout): - t.Error("At least one dial should return error") - } -} - -func TestLimitDialerDroppedEndpoints(t *testing.T) { - _, dropAddr := autoListener(t) - _, addr := autoListener(t) - - URL, err := url.Parse(fmt.Sprintf("http://%s", dropAddr)) - require.NoError(t, err) - timeout := time.Second - - dialer := NewLimitDialer(4, timeout, timeout, []url.URL{*URL}) - - _, errDialFailure := dialer.Dial("tcp", dropAddr) - require.Error(t, errDialFailure, "Should reject attempt to connect with dropped endpoint") - - _, errDial := dialer.Dial("tcp", addr) - require.NoError(t, errDial) -} diff --git a/httphandler/httphandler.go b/httphandler/httphandler.go index ecf4f95..6ffe4fd 100644 --- a/httphandler/httphandler.go +++ b/httphandler/httphandler.go @@ -1,13 +1,14 @@ package httphandler import ( + "context" + "crypto/rand" + "encoding/hex" "io" "net/http" - "net/url" "time" "github.com/allegro/akubra/config" - "github.com/allegro/akubra/dial" "github.com/allegro/akubra/log" ) @@ -20,7 +21,17 @@ type Handler struct { } func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - resp, err := h.roundTripper.RoundTrip(req) + + randomID := make([]byte, 12) + _, err := rand.Read(randomID) + if err != nil { + randomID = []byte("notrandomid") + } + + randomIDStr := hex.EncodeToString(randomID) + randomIDContext := context.WithValue(req.Context(), log.ContextreqIDKey, randomIDStr) + log.Debugf("Request id %s", randomIDStr) + resp, err := h.roundTripper.RoundTrip(req.WithContext(randomIDContext)) if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -54,24 +65,18 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // ConfigureHTTPTransport returns http.Transport with customized dialer, // MaxIdleConnsPerHost and DisableKeepAlives func ConfigureHTTPTransport(conf config.Config) (*http.Transport, error) { + connDuration, err := time.ParseDuration(conf.ConnectionTimeout) if err != nil { return nil, err } - var dialer *dial.LimitDialer - var maintainedBackendURLs []url.URL - if conf.MaintainedBackends != nil { - for _, yamlURLS := range conf.MaintainedBackends { - maintainedBackendURLs = append(maintainedBackendURLs, *yamlURLS.URL) - } - } - - dialer = dial.NewLimitDialer(conf.ConnLimit, connDuration, connDuration, maintainedBackendURLs) httpTransport := &http.Transport{ - Dial: dialer.Dial, - DisableKeepAlives: conf.KeepAlive, - MaxIdleConnsPerHost: int(conf.ConnLimit)} + MaxIdleConns: int(conf.ConnLimit), + IdleConnTimeout: connDuration, + ResponseHeaderTimeout: connDuration, + DisableKeepAlives: conf.KeepAlive, + } return httpTransport, nil } diff --git a/httphandler/log.go b/httphandler/log.go index 8268ff2..7ce5a84 100644 --- a/httphandler/log.go +++ b/httphandler/log.go @@ -4,6 +4,8 @@ import ( "fmt" "net/http" "time" + + "github.com/allegro/akubra/log" ) // AccessMessageData holds all important informations @@ -16,6 +18,7 @@ type AccessMessageData struct { StatusCode int `json:"status"` Duration float64 `json:"duration"` RespErr string `json:"error"` + ReqID string `json:"reqID"` Time string `json:"ts"` } @@ -31,12 +34,14 @@ func (amd AccessMessageData) String() string { func NewAccessLogMessage(req http.Request, statusCode int, duration float64, respErr string) *AccessMessageData { ts := time.Now().Format(time.RFC3339Nano) + reqID, _ := req.Context().Value(log.ContextreqIDKey).(string) return &AccessMessageData{ req.Method, req.Host, req.URL.Path, req.Header.Get("User-Agent"), - statusCode, duration * 1000, respErr, ts} + statusCode, duration * 1000, respErr, + reqID, ts} } // ScanCSVAccessLogMessage will scan csv string and return AccessMessageData. @@ -58,6 +63,7 @@ type SyncLogMessageData struct { SuccessHost string `json:"successhost"` UserAgent string `json:"useragent"` ErrorMsg string `json:"error"` + ReqID string `json:"reqID"` Time string `json:"ts"` } @@ -75,9 +81,9 @@ func (slmd SyncLogMessageData) String() string { // NewSyncLogMessageData creates new SyncLogMessageData func NewSyncLogMessageData(method, failedHost, path, successHost, userAgent, - errorMsg string) *SyncLogMessageData { + reqID, errorMsg string) *SyncLogMessageData { ts := time.Now().Format(time.RFC3339Nano) return &SyncLogMessageData{ method, failedHost, path, successHost, userAgent, - errorMsg, ts} + errorMsg, reqID, ts} } diff --git a/httphandler/response_merger.go b/httphandler/response_merger.go index 7c332da..9849fb7 100644 --- a/httphandler/response_merger.go +++ b/httphandler/response_merger.go @@ -36,12 +36,14 @@ func (rd *responseMerger) synclog(r, successfulTup *transport.ReqResErrTuple) { if r.Err != nil { errorMsg = r.Err.Error() } + reqID := r.Req.Context().Value(log.ContextreqIDKey).(string) syncLogMsg := NewSyncLogMessageData( r.Req.Method, r.Req.Host, successfulTup.Req.URL.Path, successfulTup.Req.Host, r.Req.Header.Get("User-Agent"), + reqID, errorMsg) logMsg, err := json.Marshal(syncLogMsg) if err != nil { @@ -58,7 +60,6 @@ func (rd *responseMerger) handleFailedResponces( logMethodSet set.Set) bool { for _, r := range tups { - rd.synclog(r, successfulTup) if !alreadysent { @@ -90,21 +91,21 @@ func (rd *responseMerger) _handle(in <-chan *transport.ReqResErrTuple, out chan< break } - errorMsg := "No error" - if r.Err != nil { - errorMsg = r.Err.Error() + statusCode := 0 + if r.Res != nil { + statusCode = r.Res.StatusCode } - rd.runtimeLog.Printf("RGW resp %q, %q, %q, %t, %q", - r.Req.URL.Path, - r.Req.Method, + reqID, _ := r.Req.Context().Value(log.ContextreqIDKey).(string) + log.Debugf("Got response %s from backend %s, status: %d, method: %s, path %s, error: %q", + reqID, r.Req.Host, - r.Failed, - errorMsg) + statusCode, + r.Req.Method, + r.Req.URL.Path, + r.Err) - // pass first successful answer to client if !r.Failed && !firstPassed { - // append additional headers successfulTup = r if rd.fifo { out <- r diff --git a/httphandler/roundtripper_decorators.go b/httphandler/roundtripper_decorators.go index 9cec38f..ad45594 100644 --- a/httphandler/roundtripper_decorators.go +++ b/httphandler/roundtripper_decorators.go @@ -18,8 +18,8 @@ type loggingRoundTripper struct { } func (lrt *loggingRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { - timeStart := time.Now() + timeStart := time.Now() resp, err = lrt.roundTripper.RoundTrip(req) duration := time.Since(timeStart).Seconds() diff --git a/log/logger.go b/log/logger.go index c7712bf..61056f2 100644 --- a/log/logger.go +++ b/log/logger.go @@ -4,10 +4,16 @@ import ( "io" "log/syslog" "os" + "time" "github.com/sirupsen/logrus" ) +const ( + // ContextreqIDKey is Request Context Value key for debug logging + ContextreqIDKey = "reqID" +) + // SyslogFacilityMap is string map of facilities var SyslogFacilityMap = map[string]syslog.Priority{ "LOG_KERN": syslog.LOG_KERN, @@ -115,7 +121,8 @@ func NewLogger(config LoggerConfig) (Logger, error) { var formatter logrus.Formatter formatter = &logrus.TextFormatter{ - FullTimestamp: true, + FullTimestamp: true, + TimestampFormat: time.StampMicro, } if config.PlainText { diff --git a/main.go b/main.go index 109cf99..da95087 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ func main() { mainlog := conf.Mainlog mainlog.Printf("starting on port %s", conf.Listen) mainlog.Printf("connlimit %v", conf.ConnLimit) + mainlog.Printf("connection timeout %v", conf.ConnectionTimeout) srv := newService(conf) startErr := srv.start() if startErr != nil { diff --git a/sharding/sharding.go b/sharding/sharding.go index 5f03c4d..500715c 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -8,6 +8,7 @@ import ( "github.com/allegro/akubra/config" "github.com/allegro/akubra/httphandler" + "github.com/allegro/akubra/log" "github.com/allegro/akubra/transport" "github.com/golang/groupcache/consistenthash" ) @@ -21,7 +22,7 @@ type cluster struct { func newMultiBackendCluster(transp http.RoundTripper, multiResponseHandler transport.MultipleResponsesHandler, - clusterConf config.ClusterConfig, name string) cluster { + clusterConf config.ClusterConfig, name string, maintainedBackends []config.YAMLURL) cluster { backends := make([]url.URL, len(clusterConf.Backends)) for i, backend := range clusterConf.Backends { @@ -31,7 +32,8 @@ func newMultiBackendCluster(transp http.RoundTripper, multiTransport := transport.NewMultiTransport( transp, backends, - multiResponseHandler) + multiResponseHandler, + maintainedBackends) return cluster{ multiTransport, @@ -53,7 +55,7 @@ func (rf ringFactory) initCluster(name string) (cluster, error) { return cluster{}, fmt.Errorf("no cluster %q in configuration", name) } respHandler := httphandler.EarliestResponseHandler(rf.conf) - return newMultiBackendCluster(rf.transport, respHandler, clusterConf, name), nil + return newMultiBackendCluster(rf.transport, respHandler, clusterConf, name, rf.conf.MaintainedBackends), nil } func (rf ringFactory) getCluster(name string) (cluster, error) { @@ -90,12 +92,15 @@ func (rf ringFactory) mapShards(weightSum uint64, clientCfg config.ClientConfig) func (rf ringFactory) uniqBackends(clientCfg config.ClientConfig) ([]url.URL, error) { allBackendsSet := make(map[config.YAMLURL]bool) + log.Debugf("client %v", clientCfg.Clusters) for _, name := range clientCfg.Clusters { + log.Debugf("cluster %s", name) clientCluster, err := rf.getCluster(name) if err != nil { return nil, err } for _, backendURL := range clientCluster.backends { + log.Debugf("backend %s", backendURL.Host) allBackendsSet[backendURL] = true } } @@ -165,12 +170,19 @@ func (rf ringFactory) clientRing(clientCfg config.ClientConfig) (shardsRing, err allBackendsRoundTripper := transport.NewMultiTransport( rf.transport, allBackendsSlice, - respHandler) + respHandler, + rf.conf.MaintainedBackends) + log.Debugf("All backends %v", allBackendsSlice) regressionMap, err := rf.createRegressionMap(clientCfg.Clusters) if err != nil { return shardsRing{}, nil } - return shardsRing{cHashMap, shardMap, allBackendsRoundTripper, regressionMap, rf.conf.ClusterSyncLog}, nil + return shardsRing{ + cHashMap, + shardMap, + allBackendsRoundTripper, + regressionMap, + rf.conf.ClusterSyncLog}, nil } func newRingFactory(conf config.Config, transport http.RoundTripper) ringFactory { diff --git a/transport/transport.go b/transport/transport.go index 8da92f4..f0ad465 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -4,12 +4,16 @@ import ( "bytes" "context" "errors" + "fmt" "io" "io/ioutil" "net/http" "net/url" "sync" "time" + + "github.com/allegro/akubra/config" + "github.com/allegro/akubra/log" ) // ReqResErrTuple is intermediate structure for internal use of @@ -128,7 +132,8 @@ type RequestProcessor func(orig *http.Request, copies []*http.Request) type MultiTransport struct { http.RoundTripper // Backends is list of target endpoints URL - Backends []url.URL + Backends []url.URL + SkipBackends map[string]bool // Response handler will get `ReqResErrTuple` in `in` channel // should process all responses and send one to out chan. // Response senf to out chan will be returned from RoundTrip. @@ -163,6 +168,7 @@ func (mt *MultiTransport) ReplicateRequests(req *http.Request, cancelFun context for _, backend := range mt.Backends { req.URL.Host = backend.Host + log.Debugf("Replicate request %s, for %s", req.Context().Value(log.ContextreqIDKey), backend.Host) newBody := ioutil.NopCloser(bytes.NewReader(bodyBuffer.Bytes())) r, rerr := http.NewRequest(req.Method, req.URL.String(), newBody) // Copy request data @@ -188,8 +194,17 @@ func (mt *MultiTransport) sendRequest( ctx := req.Context() o := make(chan *ReqResErrTuple) go func() { - resp, err := mt.RoundTripper.RoundTrip(req) + if mt.SkipBackends[req.URL.Host] { + log.Debugf("Skipping request %s, for %s", req.Context().Value(log.ContextreqIDKey), req.URL.Host) + r := &ReqResErrTuple{req, nil, fmt.Errorf("Maintained Backend %s", req.URL.Host), true} + o <- r + return + } + resp, err := mt.RoundTripper.RoundTrip(req.WithContext(context.Background())) // report Non 2XX status codes as errors + if err != nil { + log.Debugf("Send request error %s, %s", err.Error(), ctx.Value(log.ContextreqIDKey)) + } failed := err != nil || resp != nil && (resp.StatusCode < 200 || resp.StatusCode > 399) r := &ReqResErrTuple{req, resp, err, failed} o <- r @@ -197,6 +212,7 @@ func (mt *MultiTransport) sendRequest( var reqresperr *ReqResErrTuple select { case <-ctx.Done(): + log.Debugf("Ctx Done reqID %s ", ctx.Value(log.ContextreqIDKey)) reqresperr = &ReqResErrTuple{req, nil, ErrBodyContentLengthMismatch, true} case reqresperr = <-o: break @@ -207,7 +223,7 @@ func (mt *MultiTransport) sendRequest( // RoundTrip satisfies http.RoundTripper interface func (mt *MultiTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) { bctx, cancelFunc := context.WithCancel(context.Background()) - + bctx = context.WithValue(bctx, log.ContextreqIDKey, req.Context().Value(log.ContextreqIDKey)) reqs, err := mt.ReplicateRequests(req, cancelFunc) if err != nil { return nil, err @@ -241,16 +257,22 @@ func (mt *MultiTransport) RoundTrip(req *http.Request) (resp *http.Response, err // are nil will use default ones func NewMultiTransport(roundTripper http.RoundTripper, backends []url.URL, - responsesHandler MultipleResponsesHandler) *MultiTransport { + responsesHandler MultipleResponsesHandler, + maintainedBackends []config.YAMLURL) *MultiTransport { if responsesHandler == nil { responsesHandler = DefaultHandleResponses } if roundTripper == nil { roundTripper = http.DefaultTransport } + mb := make(map[string]bool, len(maintainedBackends)) + for _, yurl := range maintainedBackends { + mb[yurl.Host] = true + } return &MultiTransport{ RoundTripper: roundTripper, Backends: backends, + SkipBackends: mb, HandleResponses: responsesHandler} } diff --git a/transport/transport_test.go b/transport/transport_test.go index b764c14..f621ce6 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -2,6 +2,7 @@ package transport import ( "bytes" + "fmt" "io" "net/http" "net/http/httptest" @@ -9,8 +10,6 @@ import ( "testing" "time" - "github.com/allegro/akubra/dial" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -20,9 +19,7 @@ func TestLimitReaderFromBuffer(t *testing.T) { lreader := io.LimitReader(reader, int64(len(stream))) p := make([]byte, len(stream)) n, err := io.ReadFull(lreader, p) - if n == 0 { - t.Error("read 0 bytes") - } + require.NotEqual(t, 0, n, "read no bytes") if err != nil && err != io.EOF { t.Errorf("Got strange error %q", err) } @@ -56,9 +53,7 @@ func mkDummySrvs(count int, stream []byte, t *testing.T) []url.URL { })) dummySrvs = append(dummySrvs, ts) urlN, err := url.Parse(ts.URL) - if err != nil { - t.Error(err) - } + require.NoError(t, err) urls = append(urls, *urlN) } return urls @@ -120,9 +115,8 @@ func TestTimeoutReader(t *testing.T) { tr := &TimeoutReader{pr, time.Second * 2} for i := 0; i < 4; i++ { _, err := tr.Read(make([]byte, 20)) - if err != nil { - t.Errorf("Timeout was not reached, but error occured %s", err.Error()) - } + require.NoError(t, err, + fmt.Sprintf("Timeout was not reached, but error occured %s", err)) } tr2 := &TimeoutReader{pr, time.Millisecond} _, err := tr2.Read(make([]byte, 0, 20)) @@ -146,21 +140,3 @@ func TestRequestMultiplication(t *testing.T) { t.Errorf("Should get ErrTimeout or ErrBodyContentLengthMismatch") } } - -func TestMaintainedBackend(t *testing.T) { - stream := []byte("zażółć gęślą jaźń 123456789") - urls := mkDummySrvs(2, stream, t) - req := dummyReq(stream, 0) - - dialer := dial.NewLimitDialer(2, time.Second, time.Second, []url.URL{urls[0]}) - - httpTransport := &http.Transport{ - Dial: dialer.Dial, - DisableKeepAlives: false, - MaxIdleConnsPerHost: 1} - transp := mkTransportWithRoundTripper(urls, httpTransport, t) - - resp, err := transp.RoundTrip(req) - require.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) -} From 47a76195cdd819cea6818d22a08f382e70644911 Mon Sep 17 00:00:00 2001 From: Michal Jarco Date: Thu, 9 Feb 2017 14:47:51 +0100 Subject: [PATCH 3/3] Added Content-Length to synclog --- httphandler/log.go | 6 ++++-- httphandler/response_merger.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/httphandler/log.go b/httphandler/log.go index 7ce5a84..c5dee40 100644 --- a/httphandler/log.go +++ b/httphandler/log.go @@ -62,6 +62,7 @@ type SyncLogMessageData struct { Path string `json:"path"` SuccessHost string `json:"successhost"` UserAgent string `json:"useragent"` + ContentLength string `json:"content-length"` ErrorMsg string `json:"error"` ReqID string `json:"reqID"` Time string `json:"ts"` @@ -76,14 +77,15 @@ func (slmd SyncLogMessageData) String() string { slmd.Path, slmd.SuccessHost, slmd.UserAgent, + slmd.ContentLength, slmd.ErrorMsg) } // NewSyncLogMessageData creates new SyncLogMessageData func NewSyncLogMessageData(method, failedHost, path, successHost, userAgent, - reqID, errorMsg string) *SyncLogMessageData { + contentLength, reqID, errorMsg string) *SyncLogMessageData { ts := time.Now().Format(time.RFC3339Nano) return &SyncLogMessageData{ method, failedHost, path, successHost, userAgent, - errorMsg, reqID, ts} + contentLength, errorMsg, reqID, ts} } diff --git a/httphandler/response_merger.go b/httphandler/response_merger.go index 9849fb7..8834fcc 100644 --- a/httphandler/response_merger.go +++ b/httphandler/response_merger.go @@ -43,6 +43,7 @@ func (rd *responseMerger) synclog(r, successfulTup *transport.ReqResErrTuple) { successfulTup.Req.URL.Path, successfulTup.Req.Host, r.Req.Header.Get("User-Agent"), + successfulTup.Res.Header.Get("Content-Length"), reqID, errorMsg) logMsg, err := json.Marshal(syncLogMsg)