Skip to content

Commit

Permalink
Configurable Mercury transmitter parameters (#12680)
Browse files Browse the repository at this point in the history
* Configurable Mercury transmitter parameters

* Changeset

* Remove commented code

* add tag

* Rename
  • Loading branch information
samsondav authored Apr 26, 2024
1 parent 8c8994e commit f55d8be
Show file tree
Hide file tree
Showing 29 changed files with 234 additions and 45 deletions.
13 changes: 13 additions & 0 deletions .changeset/sour-jars-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"chainlink": patch
---

#added

Add configurability to mercury transmitter

```toml
[Mercury.Transmitter]
TransmitQueueMaxSize = 10_000 # Default
TransmitTimeout = "5s" # Default
```
5 changes: 3 additions & 2 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryTransmitter: cfg.Mercury().Transmitter(),
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
14 changes: 14 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,17 @@ LatestReportDeadline = "5s" # Default
[Mercury.TLS]
# CertFile is the path to a PEM file of trusted root certificate authority certificates
CertFile = "/path/to/client/certs.pem" # Example

# Mercury.Transmitter controls settings for the mercury transmitter
[Mercury.Transmitter]
# TransmitQueueMaxSize controls the size of the transmit queue. This is scoped
# per OCR instance. If the queue is full, the transmitter will start dropping
# the oldest messages in order to make space.
#
# This is useful if mercury server goes offline and the nop needs to buffer
# transmissions.
TransmitQueueMaxSize = 10_000 # Default
# TransmitTimeout controls how long the transmitter will wait for a response
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default
7 changes: 7 additions & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand All @@ -16,8 +17,14 @@ type MercuryTLS interface {
CertFile() string
}

type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
}

type Mercury interface {
Credentials(credName string) *types.MercuryCredentials
Cache() MercuryCache
TLS() MercuryTLS
Transmitter() MercuryTransmitter
}
20 changes: 18 additions & 2 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,14 +1312,30 @@ func (m *MercuryTLS) ValidateConfig() (err error) {
return
}

type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitQueueMaxSize; v != nil {
m.TransmitQueueMaxSize = v
}
if v := f.TransmitTimeout; v != nil {
m.TransmitTimeout = v
}
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
m.TLS.setFrom(&f.TLS)
m.Transmitter.setFrom(&f.Transmitter)
}

func (m *Mercury) ValidateConfig() (err error) {
Expand Down
3 changes: 2 additions & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
MailMon: mailMon,
DS: ds,
},
CSAETHKeystore: keyStore,
CSAETHKeystore: keyStore,
MercuryTransmitter: cfg.Mercury().Transmitter(),
}

if cfg.EVMEnabled() {
Expand Down
21 changes: 21 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chainlink
import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/config"
Expand All @@ -25,6 +26,8 @@ func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration {
return m.c.LatestReportDeadline.Duration()
}

var _ config.MercuryTLS = (*mercuryTLSConfig)(nil)

type mercuryTLSConfig struct {
c toml.MercuryTLS
}
Expand All @@ -33,6 +36,20 @@ func (m *mercuryTLSConfig) CertFile() string {
return *m.c.CertFile
}

var _ config.MercuryTransmitter = (*mercuryTransmitterConfig)(nil)

type mercuryTransmitterConfig struct {
c toml.MercuryTransmitter
}

func (m *mercuryTransmitterConfig) TransmitQueueMaxSize() uint32 {
return *m.c.TransmitQueueMaxSize
}

func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration {
return *m.c.TransmitTimeout
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down Expand Up @@ -60,3 +77,7 @@ func (m *mercuryConfig) Cache() config.MercuryCache {
func (m *mercuryConfig) TLS() config.MercuryTLS {
return &mercuryTLSConfig{c: m.c.TLS}
}

func (m *mercuryConfig) Transmitter() config.MercuryTransmitter {
return &mercuryTransmitterConfig{c: m.c.Transmitter}
}
8 changes: 8 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,10 @@ func TestConfig_Marshal(t *testing.T) {
TLS: toml.MercuryTLS{
CertFile: ptr("/path/to/cert.pem"),
},
Transmitter: toml.MercuryTransmitter{
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
},
}

for _, tt := range []struct {
Expand Down Expand Up @@ -1165,6 +1169,10 @@ LatestReportDeadline = '1m42s'
[Mercury.TLS]
CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
9 changes: 6 additions & 3 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-starknet/relayer/pkg/chainlink/config"

"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
coreconfig "github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand All @@ -38,6 +39,7 @@ type RelayerFactory struct {
type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LoopRelayAdapter, error) {
Expand Down Expand Up @@ -67,9 +69,10 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
}

relayerOpts := evmrelay.RelayerOpts{
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
}
relayer, err2 := evmrelay.NewRelayer(lggr.Named(relayID.ChainID), chain, relayerOpts)
if err2 != nil {
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ LatestReportDeadline = '5s'
[Mercury.TLS]
CertFile = ''

[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 10
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ LatestReportDeadline = '1m42s'
[Mercury.TLS]
CertFile = '/path/to/cert.pem'

[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ LatestReportDeadline = '5s'
[Mercury.TLS]
CertFile = ''

[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 10
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type mercuryConfig interface {
Credentials(credName string) *types.MercuryCredentials
Cache() coreconfig.MercuryCache
TLS() coreconfig.MercuryTLS
Transmitter() coreconfig.MercuryTransmitter
}

type thresholdConfig interface {
Expand Down
25 changes: 14 additions & 11 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type Relayer struct {
codec commontypes.Codec

// Mercury
mercuryORM mercury.ORM
mercuryORM mercury.ORM
transmitterCfg mercury.TransmitterConfig

// LLO/data streams
cdcFactory llo.ChannelDefinitionCacheFactory
Expand All @@ -93,7 +94,8 @@ type CSAETHKeystore interface {
type RelayerOpts struct {
DS sqlutil.DataSource
CSAETHKeystore
MercuryPool wsrpc.Pool
MercuryPool wsrpc.Pool
TransmitterConfig mercury.TransmitterConfig
}

func (c RelayerOpts) Validate() error {
Expand Down Expand Up @@ -122,14 +124,15 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R
lloORM := llo.NewORM(opts.DS, chain.ID())
cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller())
return &Relayer{
ds: opts.DS,
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: opts.MercuryPool,
cdcFactory: cdcFactory,
lloORM: lloORM,
mercuryORM: mercuryORM,
ds: opts.DS,
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: opts.MercuryPool,
cdcFactory: cdcFactory,
lloORM: lloORM,
mercuryORM: mercuryORM,
transmitterCfg: opts.TransmitterConfig,
}, nil
}

Expand Down Expand Up @@ -246,7 +249,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
}
transmitter := mercury.NewTransmitter(lggr, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)

return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
}
Expand Down
27 changes: 16 additions & 11 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"

Expand All @@ -33,12 +34,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
maxTransmitQueueSize = 10_000
maxDeleteQueueSize = 10_000
transmitTimeout = 5 * time.Second
)

const (
// Mercury server error codes
DuplicateReport = 2
Expand Down Expand Up @@ -104,9 +99,15 @@ type TransmitterReportDecoder interface {

var _ Transmitter = (*mercuryTransmitter)(nil)

type TransmitterConfig interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
}

type mercuryTransmitter struct {
services.StateMachine
lggr logger.Logger
cfg TransmitterConfig

servers map[string]*server

Expand Down Expand Up @@ -142,6 +143,8 @@ func getPayloadTypes() abi.Arguments {
type server struct {
lggr logger.Logger

transmitTimeout time.Duration

c wsrpc.Client
pm *PersistenceManager
q *TransmitQueue
Expand Down Expand Up @@ -221,7 +224,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
// queue was closed
return
}
ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(transmitTimeout))
ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(s.transmitTimeout))
res, err := s.c.Transmit(ctx, t.Req)
cancel()
if runloopCtx.Err() != nil {
Expand Down Expand Up @@ -272,18 +275,19 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
}
}

func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
for serverURL, client := range clients {
cLggr := lggr.Named(serverURL).With("serverURL", serverURL)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency)
servers[serverURL] = &server{
cLggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm),
make(chan *pb.TransmitRequest, maxDeleteQueueSize),
NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
Expand All @@ -295,6 +299,7 @@ func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAcc
return &mercuryTransmitter{
services.StateMachine{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
cfg,
servers,
codec,
feedID,
Expand Down
Loading

0 comments on commit f55d8be

Please sign in to comment.