Skip to content

Commit

Permalink
feat: resource manager (#246)
Browse files Browse the repository at this point in the history
* feat: resouce manager

* feat: add resource manager flags

* feat: add resource manager to dowmloader service

* feat: add resource manager to challenge service

* feat: add resource manager to task node service

* chore: rich log info for rcmgr

* feat: init global resource manager

* fix: integration testing bugs

* feat: return preparations error to client during put object

* chore: fix cr comments
  • Loading branch information
joeylichang authored Mar 27, 2023
1 parent c6ebd0b commit 90b5039
Show file tree
Hide file tree
Showing 21 changed files with 1,507 additions and 22 deletions.
27 changes: 26 additions & 1 deletion cmd/storage_provider/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"github.com/bnb-chain/greenfield-storage-provider/pkg/metrics"
"github.com/bnb-chain/greenfield-storage-provider/pkg/rcmgr"
"github.com/bnb-chain/greenfield-storage-provider/service/blocksyncer"
"github.com/bnb-chain/greenfield-storage-provider/service/challenge"
"github.com/bnb-chain/greenfield-storage-provider/service/downloader"
Expand Down Expand Up @@ -64,7 +65,31 @@ func initMetrics(ctx *cli.Context, cfg *config.StorageProviderConfig) error {
return nil
}

// initService init service instance by name and config.
// initResourceManager initializes global resource manager.
func initResourceManager(ctx *cli.Context) error {
if ctx.IsSet(utils.DisableResourceManagerFlag.Name) &&
ctx.Bool(utils.DisableResourceManagerFlag.Name) {
return nil
}
var (
limits = rcmgr.DefaultLimitConfig
err error
)
if ctx.IsSet(utils.ResourceManagerConfigFlag.Name) {
limits, err = rcmgr.NewLimitConfigFromToml(
ctx.String(utils.ResourceManagerConfigFlag.Name))
if err != nil {
return err
}
}
if _, err = rcmgr.NewResourceManager(limits); err != nil {
return err
}
log.Infow("init resource manager", "limits", limits.String(), "error", err)
return nil
}

// initService initializes service instance by name and config.
func initService(serviceName string, cfg *config.StorageProviderConfig) (server lifecycle.Service, err error) {
switch serviceName {
case model.GatewayService:
Expand Down
25 changes: 22 additions & 3 deletions cmd/storage_provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ var (
utils.DBDatabaseFlag,
}

rcmgrFlags = []cli.Flag{
utils.DisableResourceManagerFlag,
utils.ResourceManagerConfigFlag,
}

logFlags = []cli.Flag{
utils.LogLevelFlag,
utils.LogPathFlag,
Expand All @@ -60,6 +65,7 @@ func init() {
app.Flags = utils.MergeFlags(
configFlags,
dbFlags,
rcmgrFlags,
logFlags,
metricsFlags,
)
Expand Down Expand Up @@ -107,15 +113,24 @@ func makeConfig(ctx *cli.Context) (*config.StorageProviderConfig, error) {
services := util.SplitByComma(ctx.String(utils.ServerFlag.Name))
cfg.Service = services
}
return cfg, nil
}

// makeEnv init storage provider runtime environment
func makeEnv(ctx *cli.Context, cfg *config.StorageProviderConfig) error {
// init log
if err := initLog(ctx, cfg); err != nil {
return nil, err
return err
}
// init metrics
if err := initMetrics(ctx, cfg); err != nil {
return nil, err
return err
}
return cfg, nil
// init resource manager
if err := initResourceManager(ctx); err != nil {
return err
}
return nil
}

// storageProvider is the main entry point into the system if no special subcommand is ran.
Expand All @@ -126,6 +141,10 @@ func storageProvider(ctx *cli.Context) error {
if err != nil {
return err
}
err = makeEnv(ctx, cfg)
if err != nil {
return err
}
slc := lifecycle.NewServiceLifecycle()
for _, serviceName := range cfg.Service {
// init service instance.
Expand Down
29 changes: 22 additions & 7 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
)

const (
ConfigCategory = "SP CONFIG"
LoggingCategory = "LOGGING AND DEBUGGING"
MetricsCategory = "METRICS AND STATS"
DatabaseCategory = "DATABASE"
ConfigCategory = "SP CONFIG"
LoggingCategory = "LOGGING AND DEBUGGING"
MetricsCategory = "METRICS AND STATS"
DatabaseCategory = "DATABASE"
ResourceManagerCategory = "RESOURCE MANAGER"
)

var (
Expand Down Expand Up @@ -63,23 +64,37 @@ var (
Value: config.DefaultSQLDBConfig.Database,
}

// resource manager flags
DisableResourceManagerFlag = &cli.BoolFlag{
Name: "rcmgr.disable",
Category: ResourceManagerCategory,
Usage: "Disable resource manager",
Value: false,
}
ResourceManagerConfigFlag = &cli.StringFlag{
Name: "rcmgr.config",
Category: ResourceManagerCategory,
Usage: "Resource manager config file path",
Value: "",
}

// log flags
LogLevelFlag = &cli.StringFlag{
Name: "log.level",
Category: LoggingCategory,
Usage: "log level",
Usage: "Log level",
Value: "info",
}
LogPathFlag = &cli.StringFlag{
Name: "log.path",
Category: LoggingCategory,
Usage: "log output file path",
Usage: "Log output file path",
Value: config.DefaultLogConfig.Path,
}
LogStdOutputFlag = &cli.BoolFlag{
Name: "log.std",
Category: LoggingCategory,
Usage: "log output standard io",
Usage: "Log output standard io",
}

// Metrics flags
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.14.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/stretchr/testify v1.8.1
github.com/tendermint/tendermint v0.35.9
github.com/urfave/cli/v2 v2.25.0
Expand All @@ -53,7 +54,9 @@ require (
)

require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
Expand Down
68 changes: 68 additions & 0 deletions pkg/rcmgr/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package rcmgr

import (
"errors"
)

// ErrResourceLimitExceeded is returned when attempting to perform an operation that would
// exceed system resource limits.
var ErrResourceLimitExceeded = errors.New("resource limit exceeded")

// ErrResourceScopeClosed is returned when attempting to reserve resources in a closed resource
// scope.
var ErrResourceScopeClosed = errors.New("resource scope closed")

type ErrMemoryLimitExceeded struct {
current, attempted, limit int64
priority uint8
err error
}

func (e *ErrMemoryLimitExceeded) Error() string { return e.err.Error() }
func (e *ErrMemoryLimitExceeded) Unwrap() error { return e.err }

// edge may be empty if this is not an edge error
func logValuesMemoryLimit(scope, edge string, stat ScopeStat, err error) []interface{} {
logValues := make([]interface{}, 0, 2*8)
logValues = append(logValues, "scope", scope)
if edge != "" {
logValues = append(logValues, "edge", edge)
}
var e *ErrMemoryLimitExceeded
if errors.As(err, &e) {
logValues = append(logValues,
"current", e.current,
"attempted", e.attempted,
"priority", e.priority,
"limit", e.limit,
)
}
return append(logValues, "stat", stat, "error", err)
}

type ErrConnLimitExceeded struct {
current, attempted, limit int
err error
}

func (e *ErrConnLimitExceeded) Error() string { return e.err.Error() }
func (e *ErrConnLimitExceeded) Unwrap() error { return e.err }

// edge may be empty if this is not an edge error
func logValuesConnLimit(scope, edge string, dir Direction, stat ScopeStat, err error) []interface{} {
logValues := make([]interface{}, 0, 2*9)
logValues = append(logValues, "scope", scope)
if edge != "" {
logValues = append(logValues, "edge", edge)
}
logValues = append(logValues, "direction", dir)
var e *ErrConnLimitExceeded
if errors.As(err, &e) {
logValues = append(logValues,
"current", e.current,
"attempted", e.attempted,
"limit", e.limit,
)
}
return append(logValues, "stat", stat, "error", err)
}
106 changes: 106 additions & 0 deletions pkg/rcmgr/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package rcmgr

import (
"fmt"
"math"

"github.com/shirou/gopsutil/mem"

"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
)

const (
LimitFactor = 0.85
DefaultMemorySize uint64 = 8 * 1024 * 1024
)

// Limit is an interface that that specifies basic resource limits.
type Limit interface {
// GetMemoryLimit returns the (current) memory limit.
GetMemoryLimit() int64
// GetConnLimit returns the connection limit, for inbound or outbound connections.
GetConnLimit(Direction) int
// GetConnTotalLimit returns the total connection limit
GetConnTotalLimit() int
// GetFDLimit returns the file descriptor limit.
GetFDLimit() int
// String returns the Limit state string
String() string
}

// Limiter is an interface for providing limits to the resource manager.
type Limiter interface {
GetSystemLimits() Limit
GetTransientLimits() Limit
GetServiceLimits(svc string) Limit
String() string
}

var _ Limit = &BaseLimit{}

// BaseLimit is a mixin type for basic resource limits.
type BaseLimit struct {
Conns int `json:",omitempty"`
ConnsInbound int `json:",omitempty"`
ConnsOutbound int `json:",omitempty"`
FD int `json:",omitempty"`
Memory int64 `json:",omitempty"`
}

// GetMemoryLimit returns the (current) memory limit.
func (limit *BaseLimit) GetMemoryLimit() int64 {
return limit.Memory
}

// GetConnLimit returns the connection limit, for inbound or outbound connections.
func (limit *BaseLimit) GetConnLimit(direction Direction) int {
if direction == DirInbound {
return limit.ConnsInbound
}
return limit.ConnsOutbound
}

// GetConnTotalLimit returns the total connection limit
func (limit *BaseLimit) GetConnTotalLimit() int {
return limit.Conns
}

// GetFDLimit returns the file descriptor limit.
func (limit *BaseLimit) GetFDLimit() int {
return limit.FD
}

// String returns the Limit state string
// TODO:: supports connection and fd field
func (limit *BaseLimit) String() string {
return fmt.Sprintf("memory limits %d", limit.Memory)
}

// InfiniteBaseLimit are a limiter configuration that uses unlimited limits, thus effectively not limiting anything.
// Keep in mind that the operating system limits the number of file descriptors that an application can use.
var InfiniteBaseLimit = BaseLimit{
Conns: math.MaxInt,
ConnsInbound: math.MaxInt,
ConnsOutbound: math.MaxInt,
FD: math.MaxInt,
Memory: math.MaxInt64,
}

// DynamicLimits generate limits by os resource
func DynamicLimits() *BaseLimit {
availableMem := DefaultMemorySize
virtualMem, err := mem.VirtualMemory()
if err != nil {
log.Errorw("failed to get os memory states", "error", err)
} else {
availableMem = virtualMem.Available
}
limits := &BaseLimit{}
limits.Memory = int64(float64(availableMem) * LimitFactor)
// TODO:: get from os and compatible with a variety of os
limits.FD = math.MaxInt
limits.Conns = math.MaxInt
limits.ConnsInbound = math.MaxInt
limits.ConnsOutbound = math.MaxInt
return limits
}
Loading

0 comments on commit 90b5039

Please sign in to comment.