Skip to content

Commit

Permalink
EVM-779 Throttling with concurrency (#1821)
Browse files Browse the repository at this point in the history
Co-authored-by: stana-ethernal <stana.miric@ethernal.tech>
  • Loading branch information
igorcrevar and stana-miric authored Aug 18, 2023
1 parent d18818e commit 883cbbc
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 171 deletions.
8 changes: 4 additions & 4 deletions command/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`

RequestsPerSecondDebug uint64 `json:"requests_per_second_debug" yaml:"requests_per_second_debug"`
ConcurrentRequestsDebug uint64 `json:"concurrent_requests_debug" yaml:"concurrent_requests_debug"`
}

// Telemetry holds the config details for metric services.
Expand Down Expand Up @@ -82,8 +82,8 @@ const (
// on ethereum epoch lasts for 32 blocks. more details: https://www.alchemy.com/overviews/ethereum-commitment-levels
DefaultNumBlockConfirmations uint64 = 64

// Maximum number of allowed requests per second for debug endpoints
DefaultRequestsPerSecondDebug uint64 = 5
// Maximum number of allowed concurrent requests for debug endpoints
DefaultConcurrentRequestsDebug uint64 = 32
)

// DefaultConfig returns the default server configuration
Expand Down Expand Up @@ -121,7 +121,7 @@ func DefaultConfig() *Config {
JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit,
Relayer: false,
NumBlockConfirmations: DefaultNumBlockConfirmations,
RequestsPerSecondDebug: DefaultRequestsPerSecondDebug,
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
}
}

Expand Down
4 changes: 2 additions & 2 deletions command/server/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
relayerFlag = "relayer"
numBlockConfirmationsFlag = "num-block-confirmations"

requestsPerSecondDebugFlag = "requests-per-second-debug"
concurrentRequestsDebugFlag = "concurrent-requests-debug"
)

// Flags that are deprecated, but need to be preserved for
Expand Down Expand Up @@ -154,7 +154,7 @@ func (p *serverParams) generateConfig() *server.Config {
AccessControlAllowOrigin: p.rawConfig.CorsAllowedOrigins,
BatchLengthLimit: p.rawConfig.JSONRPCBatchRequestLimit,
BlockRangeLimit: p.rawConfig.JSONRPCBlockRangeLimit,
RequestsPerSecondDebug: p.rawConfig.RequestsPerSecondDebug,
ConcurrentRequestsDebug: p.rawConfig.ConcurrentRequestsDebug,
},
GRPCAddr: p.grpcAddress,
LibP2PAddr: p.libp2pAddress,
Expand Down
8 changes: 4 additions & 4 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ func setFlags(cmd *cobra.Command) {
)

cmd.Flags().Uint64Var(
&params.rawConfig.RequestsPerSecondDebug,
requestsPerSecondDebugFlag,
defaultConfig.RequestsPerSecondDebug,
"maximal number of requests per second for debug endpoints",
&params.rawConfig.ConcurrentRequestsDebug,
concurrentRequestsDebugFlag,
defaultConfig.ConcurrentRequestsDebug,
"maximal number of concurrent requests for debug endpoints",
)

setLegacyFlags(cmd)
Expand Down
185 changes: 95 additions & 90 deletions jsonrpc/debug_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ type Debug struct {
throttling *Throttling
}

func NewDebug(store debugStore, requestsPerSecond int) *Debug {
func NewDebug(store debugStore, requestsPerSecond uint64) *Debug {
return &Debug{
store: store,
throttling: NewThrottling(requestsPerSecond),
throttling: NewThrottling(requestsPerSecond, time.Second),
}
}

Expand All @@ -88,119 +88,124 @@ func (d *Debug) TraceBlockByNumber(
blockNumber BlockNumber,
config *TraceConfig,
) (interface{}, error) {
if err := d.throttling.AttemptRequest(); err != nil {
return nil, err
}

num, err := GetNumericBlockNumber(blockNumber, d.store)
if err != nil {
return nil, err
}

block, ok := d.store.GetBlockByNumber(num, true)
if !ok {
return nil, fmt.Errorf("block %d not found", num)
}

return d.traceBlock(block, config)
return d.throttling.AttemptRequest(
context.Background(),
func() (interface{}, error) {
num, err := GetNumericBlockNumber(blockNumber, d.store)
if err != nil {
return nil, err
}

block, ok := d.store.GetBlockByNumber(num, true)
if !ok {
return nil, fmt.Errorf("block %d not found", num)
}

return d.traceBlock(block, config)
},
)
}

func (d *Debug) TraceBlockByHash(
blockHash types.Hash,
config *TraceConfig,
) (interface{}, error) {
if err := d.throttling.AttemptRequest(); err != nil {
return nil, err
}

block, ok := d.store.GetBlockByHash(blockHash, true)
if !ok {
return nil, fmt.Errorf("block %s not found", blockHash)
}

return d.traceBlock(block, config)
return d.throttling.AttemptRequest(
context.Background(),
func() (interface{}, error) {
block, ok := d.store.GetBlockByHash(blockHash, true)
if !ok {
return nil, fmt.Errorf("block %s not found", blockHash)
}

return d.traceBlock(block, config)
},
)
}

func (d *Debug) TraceBlock(
input string,
config *TraceConfig,
) (interface{}, error) {
if err := d.throttling.AttemptRequest(); err != nil {
return nil, err
}

blockByte, decodeErr := hex.DecodeHex(input)
if decodeErr != nil {
return nil, fmt.Errorf("unable to decode block, %w", decodeErr)
}

block := &types.Block{}
if err := block.UnmarshalRLP(blockByte); err != nil {
return nil, err
}

return d.traceBlock(block, config)
return d.throttling.AttemptRequest(
context.Background(),
func() (interface{}, error) {
blockByte, decodeErr := hex.DecodeHex(input)
if decodeErr != nil {
return nil, fmt.Errorf("unable to decode block, %w", decodeErr)
}

block := &types.Block{}
if err := block.UnmarshalRLP(blockByte); err != nil {
return nil, err
}

return d.traceBlock(block, config)
},
)
}

func (d *Debug) TraceTransaction(
txHash types.Hash,
config *TraceConfig,
) (interface{}, error) {
if err := d.throttling.AttemptRequest(); err != nil {
return nil, err
}

tx, block := GetTxAndBlockByTxHash(txHash, d.store)
if tx == nil {
return nil, fmt.Errorf("tx %s not found", txHash.String())
}

if block.Number() == 0 {
return nil, ErrTraceGenesisBlock
}

tracer, cancel, err := newTracer(config)
if err != nil {
return nil, err
}

defer cancel()

return d.store.TraceTxn(block, tx.Hash, tracer)
return d.throttling.AttemptRequest(
context.Background(),
func() (interface{}, error) {
tx, block := GetTxAndBlockByTxHash(txHash, d.store)
if tx == nil {
return nil, fmt.Errorf("tx %s not found", txHash.String())
}

if block.Number() == 0 {
return nil, ErrTraceGenesisBlock
}

tracer, cancel, err := newTracer(config)
if err != nil {
return nil, err
}

defer cancel()

return d.store.TraceTxn(block, tx.Hash, tracer)
},
)
}

func (d *Debug) TraceCall(
arg *txnArgs,
filter BlockNumberOrHash,
config *TraceConfig,
) (interface{}, error) {
if err := d.throttling.AttemptRequest(); err != nil {
return nil, err
}

header, err := GetHeaderFromBlockNumberOrHash(filter, d.store)
if err != nil {
return nil, ErrHeaderNotFound
}

tx, err := DecodeTxn(arg, header.Number, d.store, true)
if err != nil {
return nil, err
}

// If the caller didn't supply the gas limit in the message, then we set it to maximum possible => block gas limit
if tx.Gas == 0 {
tx.Gas = header.GasLimit
}

tracer, cancel, err := newTracer(config)
defer cancel()

if err != nil {
return nil, err
}

return d.store.TraceCall(tx, header, tracer)
return d.throttling.AttemptRequest(
context.Background(),
func() (interface{}, error) {
header, err := GetHeaderFromBlockNumberOrHash(filter, d.store)
if err != nil {
return nil, ErrHeaderNotFound
}

tx, err := DecodeTxn(arg, header.Number, d.store, true)
if err != nil {
return nil, err
}

// If the caller didn't supply the gas limit in the message, then we set it to maximum possible => block gas limit
if tx.Gas == 0 {
tx.Gas = header.GasLimit
}

tracer, cancel, err := newTracer(config)
defer cancel()

if err != nil {
return nil, err
}

return d.store.TraceCall(tx, header, tracer)
},
)
}

func (d *Debug) traceBlock(
Expand Down
4 changes: 2 additions & 2 deletions jsonrpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type dispatcherParams struct {
jsonRPCBatchLengthLimit uint64
blockRangeLimit uint64

requestsPerSecondDebug uint64
concurrentRequestsDebug uint64
}

func (dp dispatcherParams) isExceedingBatchLengthLimit(value uint64) bool {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (d *Dispatcher) registerEndpoints(store JSONRPCStore) error {
d.endpoints.Bridge = &Bridge{
store,
}
d.endpoints.Debug = NewDebug(store, int(d.params.requestsPerSecondDebug))
d.endpoints.Debug = NewDebug(store, d.params.concurrentRequestsDebug)

var err error

Expand Down
4 changes: 2 additions & 2 deletions jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Config struct {
BatchLengthLimit uint64
BlockRangeLimit uint64

RequestsPerSecondDebug uint64
ConcurrentRequestsDebug uint64
}

// NewJSONRPC returns the JSONRPC http server
Expand All @@ -83,7 +83,7 @@ func NewJSONRPC(logger hclog.Logger, config *Config) (*JSONRPC, error) {
priceLimit: config.PriceLimit,
jsonRPCBatchLengthLimit: config.BatchLengthLimit,
blockRangeLimit: config.BlockRangeLimit,
requestsPerSecondDebug: config.RequestsPerSecondDebug,
concurrentRequestsDebug: config.ConcurrentRequestsDebug,
},
)

Expand Down
Loading

0 comments on commit 883cbbc

Please sign in to comment.