Skip to content

Commit

Permalink
MutliNode DoAll
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Feb 22, 2024
1 parent 2b99f07 commit 39b25c8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 22 deletions.
10 changes: 6 additions & 4 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type MultiNode[
NodeStates() map[string]string
SelectNodeRPC() (RPC_CLIENT, error)

BatchCallContextAll(ctx context.Context, b []any) error
DoAll(ctx context.Context,
do func(ctx context.Context, rpc RPC_CLIENT) error) error
ConfiguredChainID() CHAIN_ID
IsL2() bool
}
Expand Down Expand Up @@ -391,7 +392,8 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
// sendonlys.
// CAUTION: This should only be used for mass re-transmitting transactions, it
// might have unexpected effects to use it for anything else.
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) BatchCallContextAll(ctx context.Context, b []any) error {
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) DoAll(ctx context.Context,
do func(ctx context.Context, rpc RPC_CLIENT) error) error {
var wg sync.WaitGroup
defer wg.Wait()

Expand All @@ -414,7 +416,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
wg.Add(1)
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
defer wg.Done()
err := n.RPC().BatchCallContext(ctx, b)
err := do(ctx, n.RPC())
if err != nil {
c.lggr.Debugw("Secondary node BatchCallContext failed", "err", err)
} else {
Expand All @@ -426,7 +428,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
if selectionErr != nil {
return selectionErr
}
return main.RPC().BatchCallContext(ctx, b)
return do(ctx, main.RPC())
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) BlockByHash(ctx context.Context, hash BLOCK_HASH) (h HEAD, err error) {
Expand Down
1 change: 0 additions & 1 deletion common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ type clientAPI[
FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error)

// Misc
BatchCallContext(ctx context.Context, b []any) error
CallContract(
ctx context.Context,
msg interface{},
Expand Down
16 changes: 6 additions & 10 deletions core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,15 @@ func (c *chainClient) BalanceAt(ctx context.Context, account common.Address, blo
}

func (c *chainClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
batch := make([]any, len(b))
for i, arg := range b {
batch[i] = any(arg)
}
return c.multiNode.BatchCallContext(ctx, batch)
return c.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPCCLient) error {
return rpc.BatchCallContext(ctx, b)
})
}

func (c *chainClient) BatchCallContextAll(ctx context.Context, b []rpc.BatchElem) error {
batch := make([]any, len(b))
for i, arg := range b {
batch[i] = any(arg)
}
return c.multiNode.BatchCallContextAll(ctx, batch)
return c.multiNode.DoAll(ctx, func(rpc RPCCLient) error {
rpc.BatchCallContext()
})
}

// TODO-1663: return custom Block type instead of geth's once client.go is deprecated.
Expand Down
11 changes: 4 additions & 7 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RPCCLient interface {
SuggestGasPrice(ctx context.Context) (p *big.Int, err error)
SuggestGasTipCap(ctx context.Context) (t *big.Int, err error)
TransactionReceiptGeth(ctx context.Context, txHash common.Hash) (r *types.Receipt, err error)
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}

type rpcClient struct {
Expand Down Expand Up @@ -315,24 +316,20 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method
return err
}

func (r *rpcClient) BatchCallContext(ctx context.Context, b []any) error {
func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx)
if err != nil {
return err
}
batch := make([]rpc.BatchElem, len(b))
for i, arg := range b {
batch[i] = arg.(rpc.BatchElem)
}
defer cancel()
lggr := r.newRqLggr().With("nBatchElems", len(b), "batchElems", b)

lggr.Trace("RPC call: evmclient.Client#BatchCallContext")
start := time.Now()
if http != nil {
err = r.wrapHTTP(http.rpc.BatchCallContext(ctx, batch))
err = r.wrapHTTP(http.rpc.BatchCallContext(ctx, b))
} else {
err = r.wrapWS(ws.rpc.BatchCallContext(ctx, batch))
err = r.wrapWS(ws.rpc.BatchCallContext(ctx, b))
}
duration := time.Since(start)

Expand Down

0 comments on commit 39b25c8

Please sign in to comment.