diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cdc5868d..e9448b753 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * [\#1364](https://github.com/cosmos/relayer/pull/1364) Include feegrant message when calculate gas. * [\#1390](https://github.com/cosmos/relayer/pull/1390) Avoid no concrete type registered for type URL error of EthAccount. * [\#1455](https://github.com/cosmos/relayer/pull/1455) Allow retry for pathEnd to avoid packet message get removed before open channel. +* [\#1470](https://github.com/cosmos/relayer/pull/1470) Add max-msg-num parameter for batch broadcast in chain configs. ## v0.9.3 diff --git a/Makefile b/Makefile index 874515ffd..19a56c418 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ build-osmosis-docker: ############################################################################### test: - @go test -mod=readonly -race ./... + @go test -mod=readonly -timeout 3m -race ./... interchaintest: cd interchaintest && go test -race -v -run TestRelayerInProcess . diff --git a/relayer/chains/cosmos/fee_market_test.go b/relayer/chains/cosmos/fee_market_test.go index 746d31ed9..f32bd3a6a 100644 --- a/relayer/chains/cosmos/fee_market_test.go +++ b/relayer/chains/cosmos/fee_market_test.go @@ -24,7 +24,7 @@ var ( MinGasAmount: 1, MaxGasAmount: 0, Debug: false, - Timeout: "30s", + Timeout: "3m", BlockTimeout: "30s", OutputFormat: "json", SignModeStr: "direct", diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index f34b1125d..25574961c 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -55,6 +55,7 @@ type CosmosProviderConfig struct { Slip44 *int `json:"coin-type" yaml:"coin-type"` SigningAlgorithm string `json:"signing-algorithm" yaml:"signing-algorithm"` Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"` + MaxMsgNum uint64 `json:"max-msg-num" yaml:"max-msg-num"` MinLoopDuration time.Duration `json:"min-loop-duration" yaml:"min-loop-duration"` ExtensionOptions []provider.ExtensionOption `json:"extension-options" yaml:"extension-options"` @@ -89,6 +90,10 @@ func (pc CosmosProviderConfig) BroadcastMode() provider.BroadcastMode { return pc.Broadcast } +func (pc CosmosProviderConfig) BroadcastMaxMsgNum() uint64 { + return pc.MaxMsgNum +} + // NewProvider validates the CosmosProviderConfig, instantiates a ChainClient and then instantiates a CosmosProvider func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) { if err := pc.Validate(); err != nil { diff --git a/relayer/chains/penumbra/provider.go b/relayer/chains/penumbra/provider.go index 345205045..a3a9ec1a0 100644 --- a/relayer/chains/penumbra/provider.go +++ b/relayer/chains/penumbra/provider.go @@ -60,6 +60,7 @@ type PenumbraProviderConfig struct { Modules []module.AppModuleBasic `json:"-" yaml:"-"` Slip44 int `json:"coin-type" yaml:"coin-type"` Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"` + MaxMsgNum uint64 `json:"max-msg-num" yaml:"max-msg-num"` MinLoopDuration time.Duration `json:"min-loop-duration" yaml:"min-loop-duration"` ExtensionOptions []provider.ExtensionOption `json:"extension-options" yaml:"extension-options"` } @@ -75,6 +76,10 @@ func (pc PenumbraProviderConfig) BroadcastMode() provider.BroadcastMode { return pc.Broadcast } +func (pc PenumbraProviderConfig) BroadcastMaxMsgNum() uint64 { + return pc.MaxMsgNum +} + // NewProvider validates the PenumbraProviderConfig, instantiates a ChainClient and then instantiates a CosmosProvider func (pc PenumbraProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) { if err := pc.Validate(); err != nil { diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 602c588c8..436faa5b8 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -330,6 +330,10 @@ func (mp *messageProcessor) trackAndSendMessages( needsClientUpdate bool, ) error { broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch + maxMsgNum := dst.chainProvider.ProviderConfig().BroadcastMaxMsgNum() + if maxMsgNum < 2 && maxMsgNum != 0 { + maxMsgNum = 2 + } var batch []messageToTrack for _, t := range mp.trackers() { @@ -347,6 +351,10 @@ func (mp *messageProcessor) trackAndSendMessages( if broadcastBatch && (retries == 0 || ordered) { batch = append(batch, t) + if len(batch) >= int(maxMsgNum) && maxMsgNum != 0 { + go mp.sendBatchMessages(ctx, src, dst, batch) + batch = nil + } continue } go mp.sendSingleMessage(ctx, src, dst, t) diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 01be3b9cf..f4376954c 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -31,6 +31,7 @@ type ProviderConfig interface { NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (ChainProvider, error) Validate() error BroadcastMode() BroadcastMode + BroadcastMaxMsgNum() uint64 } type RelayerMessage interface {