diff --git a/config/config.go b/config/config.go index a44ef57c3..f53a5d906 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,7 @@ type StorageProviderConfig struct { RateLimiter *localhttp.RateLimiterConfig DiscontinueCfg *stopserving.DiscontinueConfig MetadataCfg *metadata.MetadataConfig + BandwidthLimiter *localhttp.BandwidthLimiterConfig } // JSONMarshal marshal the StorageProviderConfig to json format @@ -111,6 +112,7 @@ var DefaultStorageProviderConfig = &StorageProviderConfig{ RateLimiter: DefaultRateLimiterConfig, DiscontinueCfg: stopserving.DefaultDiscontinueConfig, MetadataCfg: DefaultMetadataConfig, + BandwidthLimiter: DefaultBandwidthLimiterConfig, } // DefaultSQLDBConfig defines the default configuration of SQL DB @@ -208,6 +210,12 @@ var DefaultRateLimiterConfig = &localhttp.RateLimiterConfig{ }, } +var DefaultBandwidthLimiterConfig = &localhttp.BandwidthLimiterConfig{ + Enable: false, + R: 100, + B: 1000, +} + // LoadConfig loads the config file from path func LoadConfig(path string) *StorageProviderConfig { f, err := os.Open(path) diff --git a/config/config_template.toml b/config/config_template.toml index 28494a532..d1c0bf1a4 100644 --- a/config/config_template.toml +++ b/config/config_template.toml @@ -104,9 +104,16 @@ On = false RateLimit = 1 RatePeriod = "S" +[BandwidthLimiter] +Enable = false +R = 100 +B = 1000 + [StopServingCfg] BucketKeepAliveDays = 7 [MetadataCfg] IsMasterDB = true BsDBSwitchCheckIntervalSec = 3600 + + diff --git a/config/subconfig.go b/config/subconfig.go index e4899bb6e..824cf93f7 100644 --- a/config/subconfig.go +++ b/config/subconfig.go @@ -106,6 +106,11 @@ func (cfg *StorageProviderConfig) MakeGatewayConfig() (*gateway.GatewayConfig, e APILimits: apiLimitsMap, HTTPLimitCfg: cfg.RateLimiter.HTTPLimitCfg, } + gCfg.BandwidthLimitCfg = &localhttp.BandwidthLimiterConfig{ + Enable: false, + R: 100, + B: 1000, + } } return gCfg, nil } diff --git a/go.mod b/go.mod index 3726eaccc..2ee923c07 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( require ( github.com/cosmos/cosmos-proto v1.0.0-beta.1 // indirect golang.org/x/mod v0.9.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index 63feede0c..c0e76c5e2 100644 --- a/go.sum +++ b/go.sum @@ -2258,6 +2258,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/middleware/http/tokenBucket.go b/pkg/middleware/http/tokenBucket.go new file mode 100644 index 000000000..c66e53858 --- /dev/null +++ b/pkg/middleware/http/tokenBucket.go @@ -0,0 +1,31 @@ +package http + +import ( + "sync" + + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "golang.org/x/time/rate" +) + +type BandwidthLimiterConfig struct { + Enable bool //Enable Whether to enable bandwidth limiting + R rate.Limit //R The speed at which tokens are generated R per second + B int //B The size of the token bucket +} + +type BandwidthLimiter struct { + Limiter *rate.Limiter +} + +var LimiterOnce sync.Once +var BandwidthLimit *BandwidthLimiter + +func NewBandwidthLimiter(r rate.Limit, b int) { + log.Infof("config r: %v, b:%d", r, b) + + LimiterOnce.Do(func() { + BandwidthLimit = &BandwidthLimiter{ + Limiter: rate.NewLimiter(r, b), + } + }) +} diff --git a/service/gateway/gateway.go b/service/gateway/gateway.go index a6f0176ae..f58ddc4d7 100644 --- a/service/gateway/gateway.go +++ b/service/gateway/gateway.go @@ -129,6 +129,9 @@ func (gateway *Gateway) serve() { log.Errorw("failed to new api limiter", "err", err) return } + if gateway.config.BandwidthLimitCfg.Enable { + localhttp.NewBandwidthLimiter(gateway.config.BandwidthLimitCfg.R, gateway.config.BandwidthLimitCfg.B) + } gateway.registerHandler(router) server := &http.Server{ Addr: gateway.config.HTTPAddress, diff --git a/service/gateway/gateway_config.go b/service/gateway/gateway_config.go index bba8af2d8..efe79f2ab 100644 --- a/service/gateway/gateway_config.go +++ b/service/gateway/gateway_config.go @@ -19,4 +19,5 @@ type GatewayConfig struct { MetadataServiceAddress string AuthServiceAddress string APILimiterCfg *localhttp.APILimiterConfig + BandwidthLimitCfg *localhttp.BandwidthLimiterConfig } diff --git a/service/gateway/object_handler.go b/service/gateway/object_handler.go index 61aab07fe..489afa8ce 100644 --- a/service/gateway/object_handler.go +++ b/service/gateway/object_handler.go @@ -15,6 +15,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + localHttp "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/http" "github.com/bnb-chain/greenfield-storage-provider/service/downloader/types" metatypes "github.com/bnb-chain/greenfield-storage-provider/service/metadata/types" uploadertypes "github.com/bnb-chain/greenfield-storage-provider/service/uploader/types" @@ -117,7 +118,12 @@ func (gateway *Gateway) getObjectHandler(w http.ResponseWriter, r *http.Request) errDescription = makeErrorDescription(merrors.GRPCErrorToInnerError(err)) return } - + // If it is limited, it will block + if localHttp.BandwidthLimit != nil { + if err := localHttp.BandwidthLimit.Limiter.Wait(ctx); err != nil { + log.Errorw("failed to wait bandwidth limiter", "error", err) + } + } if readN = len(resp.Data); readN == 0 { log.Errorw("failed to get object due to return empty data", "response", resp) continue @@ -223,6 +229,12 @@ func (gateway *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) return } if readN > 0 { + // If it is limited, it will block + if localHttp.BandwidthLimit != nil { + if err := localHttp.BandwidthLimit.Limiter.Wait(ctx); err != nil { + log.Errorw("failed to wait bandwidth limiter", "error", err) + } + } req := &uploadertypes.PutObjectRequest{ ObjectInfo: reqContext.objectInfo, Payload: buf[:readN],