Skip to content

Commit

Permalink
incoming concurrent request count limit (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
ksitak authored and mjarco committed May 29, 2017
1 parent e9706e7 commit 092e550
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 8 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ DisableKeepAlives: false

# Maximum accepted body size
BodyMaxSize: "100M"
# Maximum number of incoming requests to process at once
MaxConcurrentRequests: 200
# Backend in maintenance mode. Akubra will skip this endpoint

# MaintainedBackends:
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type YamlConfig struct {
// ResponseHeaderTimeout see: https://golang.org/pkg/net/http/#Transport
// Default 5s (no limit)
ResponseHeaderTimeout metrics.Interval `yaml:"ResponseHeaderTimeout"`
// Max number of incoming requests to process in parallel
MaxConcurrentRequests int32 `yaml:"MaxConcurrentRequests" validate:"min=1"`

Clusters map[string]shardingconfig.ClusterConfig `yaml:"Clusters,omitempty"`
// Additional not amazon specific headers proxy will add to original request
Expand Down
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ MaxIdleConns: 1
MaxIdleConnsPerHost: 2
IdleConnTimeout: 3s
ResponseHeaderTimeout: 4s
MaxConcurrentRequests: 200
Clusters:
cluster1test:
Backends:
Expand Down Expand Up @@ -360,6 +361,7 @@ func PrepareYamlConfig(bodyMaxSize shardingconfig.HumanSizeUnits, idleConnTimeou

maxIdleConns := 1
maxIdleConnsPerHost := 2
maxConcurrentRequests := int32(200)
clusters := map[string]shardingconfig.ClusterConfig{"cluster1test": {
yamlURL,
"replicator",
Expand Down Expand Up @@ -392,6 +394,7 @@ func PrepareYamlConfig(bodyMaxSize shardingconfig.HumanSizeUnits, idleConnTimeou
maxIdleConnsPerHost,
metrics.Interval{idleConnTimeoutInp},
metrics.Interval{responseHeaderTimeoutInp},
maxConcurrentRequests,
clusters,
additionalRequestHeaders,
additionalResponseHeaders,
Expand Down
2 changes: 2 additions & 0 deletions examples/akubra.config.dist
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ ResponseHeaderTimeout: 5s
# DisableKeepAlives see: https://golang.org/pkg/net/http/#Transport
# Default false
DisableKeepAlives: false
# Maximum number of incoming requests to process at once
MaxConcurrentRequests: 200
# MaintainedBackends:
# - "http://127.0.0.1:9002"

Expand Down
24 changes: 19 additions & 5 deletions httphandler/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"io"
"net/http"
"sync/atomic"
"time"

"github.com/allegro/akubra/config"
Expand All @@ -19,11 +20,23 @@ const (

// Handler implements http.Handler interface
type Handler struct {
roundTripper http.RoundTripper
bodyMaxSize int64
roundTripper http.RoundTripper
bodyMaxSize int64
maxConcurrentRequests int32
runningRequestCount int32
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
canServe := true
if atomic.AddInt32(&h.runningRequestCount, 1) > h.maxConcurrentRequests {
canServe = false
}
defer atomic.AddInt32(&h.runningRequestCount, -1)
if !canServe {
log.Printf("Rejected request from %s - too many other requests in progress.", req.Host)
http.Error(w, "Too many requests in progress.", http.StatusServiceUnavailable)
return
}

randomID := make([]byte, 12)
_, err := rand.Read(randomID)
Expand Down Expand Up @@ -112,9 +125,10 @@ func DecorateRoundTripper(conf config.Config, rt http.RoundTripper) http.RoundTr
}

// NewHandlerWithRoundTripper returns Handler, but will not construct transport.MultiTransport by itself
func NewHandlerWithRoundTripper(roundTripper http.RoundTripper, bodyMaxSize int64) (http.Handler, error) {
func NewHandlerWithRoundTripper(roundTripper http.RoundTripper, bodyMaxSize int64, maxConcurrentRequests int32) (http.Handler, error) {
return &Handler{
roundTripper: roundTripper,
bodyMaxSize: bodyMaxSize,
roundTripper: roundTripper,
bodyMaxSize: bodyMaxSize,
maxConcurrentRequests: maxConcurrentRequests,
}, nil
}
12 changes: 10 additions & 2 deletions httphandler/httphandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestShouldReturnEntityTooLargeCode(t *testing.T) {
request := httptest.NewRequest("POST", "http://somepath", nil)
request.Header.Set("Content-Length", "4096")
handler := &Handler{bodyMaxSize: 1024}
handler := &Handler{bodyMaxSize: 1024, maxConcurrentRequests: 10}
writer := httptest.NewRecorder()
handler.ServeHTTP(writer, request)
assert.Equal(t, http.StatusRequestEntityTooLarge, writer.Code)
Expand All @@ -20,8 +20,16 @@ func TestShouldReturnEntityTooLargeCode(t *testing.T) {
func TestShouldReturnBadRequestOnUnparsableContentLengthHeader(t *testing.T) {
request := httptest.NewRequest("POST", "http://somepath", nil)
request.Header.Set("Content-Length", "strange-content-header")
handler := &Handler{bodyMaxSize: 1024}
handler := &Handler{bodyMaxSize: 1024, maxConcurrentRequests: 10}
writer := httptest.NewRecorder()
handler.ServeHTTP(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
}

func TestShouldReturnServiceNotAvailableOnTooManyRequests(t *testing.T) {
request := httptest.NewRequest("GET", "http://somepath", nil)
handler := &Handler{bodyMaxSize: 1024, maxConcurrentRequests: 0}
writer := httptest.NewRecorder()
handler.ServeHTTP(writer, request)
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (s *service) start() error {
if err != nil {
log.Fatalln(err)
}

return srv.Serve(listener)
}

Expand Down
2 changes: 1 addition & 1 deletion sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,5 @@ func NewHandler(conf config.Config) (http.Handler, error) {

roundTripper := httphandler.DecorateRoundTripper(conf, ring)

return httphandler.NewHandlerWithRoundTripper(roundTripper, conf.BodyMaxSize.SizeInBytes)
return httphandler.NewHandlerWithRoundTripper(roundTripper, conf.BodyMaxSize.SizeInBytes, conf.MaxConcurrentRequests)
}

0 comments on commit 092e550

Please sign in to comment.