From 092e5504b5891b8bfcd16edb00af9932acfbb765 Mon Sep 17 00:00:00 2001 From: Krzysztof Sitak Date: Mon, 29 May 2017 10:54:10 +0200 Subject: [PATCH] incoming concurrent request count limit (#47) --- README.md | 2 ++ config/config.go | 2 ++ config/config_test.go | 3 +++ examples/akubra.config.dist | 2 ++ httphandler/httphandler.go | 24 +++++++++++++++++++----- httphandler/httphandler_test.go | 12 ++++++++++-- main.go | 1 + sharding/sharding.go | 2 +- 8 files changed, 40 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 6898fa5..fa1c643 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,8 @@ DisableKeepAlives: false # Maximum accepted body size BodyMaxSize: "100M" +# Maximum number of incoming requests to process at once +MaxConcurrentRequests: 200 # Backend in maintenance mode. Akubra will skip this endpoint # MaintainedBackends: diff --git a/config/config.go b/config/config.go index f5fa2be..c87142f 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,8 @@ type YamlConfig struct { // ResponseHeaderTimeout see: https://golang.org/pkg/net/http/#Transport // Default 5s (no limit) ResponseHeaderTimeout metrics.Interval `yaml:"ResponseHeaderTimeout"` + // Max number of incoming requests to process in parallel + MaxConcurrentRequests int32 `yaml:"MaxConcurrentRequests" validate:"min=1"` Clusters map[string]shardingconfig.ClusterConfig `yaml:"Clusters,omitempty"` // Additional not amazon specific headers proxy will add to original request diff --git a/config/config_test.go b/config/config_test.go index a5b4553..6288793 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -283,6 +283,7 @@ MaxIdleConns: 1 MaxIdleConnsPerHost: 2 IdleConnTimeout: 3s ResponseHeaderTimeout: 4s +MaxConcurrentRequests: 200 Clusters: cluster1test: Backends: @@ -360,6 +361,7 @@ func PrepareYamlConfig(bodyMaxSize shardingconfig.HumanSizeUnits, idleConnTimeou maxIdleConns := 1 maxIdleConnsPerHost := 2 + maxConcurrentRequests := int32(200) clusters := map[string]shardingconfig.ClusterConfig{"cluster1test": { yamlURL, "replicator", @@ -392,6 +394,7 @@ func PrepareYamlConfig(bodyMaxSize shardingconfig.HumanSizeUnits, idleConnTimeou maxIdleConnsPerHost, metrics.Interval{idleConnTimeoutInp}, metrics.Interval{responseHeaderTimeoutInp}, + maxConcurrentRequests, clusters, additionalRequestHeaders, additionalResponseHeaders, diff --git a/examples/akubra.config.dist b/examples/akubra.config.dist index a8674aa..0318810 100644 --- a/examples/akubra.config.dist +++ b/examples/akubra.config.dist @@ -29,6 +29,8 @@ ResponseHeaderTimeout: 5s # DisableKeepAlives see: https://golang.org/pkg/net/http/#Transport # Default false DisableKeepAlives: false +# Maximum number of incoming requests to process at once +MaxConcurrentRequests: 200 # MaintainedBackends: # - "http://127.0.0.1:9002" diff --git a/httphandler/httphandler.go b/httphandler/httphandler.go index c7435b2..4ea85a6 100644 --- a/httphandler/httphandler.go +++ b/httphandler/httphandler.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "io" "net/http" + "sync/atomic" "time" "github.com/allegro/akubra/config" @@ -19,11 +20,23 @@ const ( // Handler implements http.Handler interface type Handler struct { - roundTripper http.RoundTripper - bodyMaxSize int64 + roundTripper http.RoundTripper + bodyMaxSize int64 + maxConcurrentRequests int32 + runningRequestCount int32 } func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + canServe := true + if atomic.AddInt32(&h.runningRequestCount, 1) > h.maxConcurrentRequests { + canServe = false + } + defer atomic.AddInt32(&h.runningRequestCount, -1) + if !canServe { + log.Printf("Rejected request from %s - too many other requests in progress.", req.Host) + http.Error(w, "Too many requests in progress.", http.StatusServiceUnavailable) + return + } randomID := make([]byte, 12) _, err := rand.Read(randomID) @@ -112,9 +125,10 @@ func DecorateRoundTripper(conf config.Config, rt http.RoundTripper) http.RoundTr } // NewHandlerWithRoundTripper returns Handler, but will not construct transport.MultiTransport by itself -func NewHandlerWithRoundTripper(roundTripper http.RoundTripper, bodyMaxSize int64) (http.Handler, error) { +func NewHandlerWithRoundTripper(roundTripper http.RoundTripper, bodyMaxSize int64, maxConcurrentRequests int32) (http.Handler, error) { return &Handler{ - roundTripper: roundTripper, - bodyMaxSize: bodyMaxSize, + roundTripper: roundTripper, + bodyMaxSize: bodyMaxSize, + maxConcurrentRequests: maxConcurrentRequests, }, nil } diff --git a/httphandler/httphandler_test.go b/httphandler/httphandler_test.go index ae6d453..6c344cb 100644 --- a/httphandler/httphandler_test.go +++ b/httphandler/httphandler_test.go @@ -11,7 +11,7 @@ import ( func TestShouldReturnEntityTooLargeCode(t *testing.T) { request := httptest.NewRequest("POST", "http://somepath", nil) request.Header.Set("Content-Length", "4096") - handler := &Handler{bodyMaxSize: 1024} + handler := &Handler{bodyMaxSize: 1024, maxConcurrentRequests: 10} writer := httptest.NewRecorder() handler.ServeHTTP(writer, request) assert.Equal(t, http.StatusRequestEntityTooLarge, writer.Code) @@ -20,8 +20,16 @@ func TestShouldReturnEntityTooLargeCode(t *testing.T) { func TestShouldReturnBadRequestOnUnparsableContentLengthHeader(t *testing.T) { request := httptest.NewRequest("POST", "http://somepath", nil) request.Header.Set("Content-Length", "strange-content-header") - handler := &Handler{bodyMaxSize: 1024} + handler := &Handler{bodyMaxSize: 1024, maxConcurrentRequests: 10} writer := httptest.NewRecorder() handler.ServeHTTP(writer, request) assert.Equal(t, http.StatusBadRequest, writer.Code) } + +func TestShouldReturnServiceNotAvailableOnTooManyRequests(t *testing.T) { + request := httptest.NewRequest("GET", "http://somepath", nil) + handler := &Handler{bodyMaxSize: 1024, maxConcurrentRequests: 0} + writer := httptest.NewRecorder() + handler.ServeHTTP(writer, request) + assert.Equal(t, http.StatusServiceUnavailable, writer.Code) +} diff --git a/main.go b/main.go index 173f6e8..aa15c10 100644 --- a/main.go +++ b/main.go @@ -104,6 +104,7 @@ func (s *service) start() error { if err != nil { log.Fatalln(err) } + return srv.Serve(listener) } diff --git a/sharding/sharding.go b/sharding/sharding.go index 1fd6ee7..e80358e 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -199,5 +199,5 @@ func NewHandler(conf config.Config) (http.Handler, error) { roundTripper := httphandler.DecorateRoundTripper(conf, ring) - return httphandler.NewHandlerWithRoundTripper(roundTripper, conf.BodyMaxSize.SizeInBytes) + return httphandler.NewHandlerWithRoundTripper(roundTripper, conf.BodyMaxSize.SizeInBytes, conf.MaxConcurrentRequests) }