Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/github_actions/actions/checkout-4
Browse files Browse the repository at this point in the history
  • Loading branch information
byte-bandit authored Sep 11, 2023
2 parents 77723f6 + a3f6254 commit 3bd32a4
Show file tree
Hide file tree
Showing 20 changed files with 610 additions and 137 deletions.
10 changes: 5 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

var (
_relayer *relayer.Relayer
_config *config.Root
_config *config.Config
_configPath string

_palomaClient *paloma.Client
Expand Down Expand Up @@ -61,12 +61,12 @@ func Commit() string { return commit }
func Relayer() *relayer.Relayer {
if _relayer == nil {
_relayer = relayer.New(
*Config(),
Config(),
*PalomaClient(),
EvmFactory(),
Time(),
relayer.Config{
KeepAliveLoopTimeout: 30 * gotime.Second,
KeepAliveLoopTimeout: 5 * gotime.Second,
KeepAliveBlockThreshold: 30,
},
)
Expand Down Expand Up @@ -98,7 +98,7 @@ func EvmFactory() *evm.Factory {
return _evmFactory
}

func Config() *config.Root {
func Config() *config.Config {
if len(_configPath) == 0 {
log.Fatal("config file path is not set")
}
Expand All @@ -116,7 +116,7 @@ func Config() *config.Root {
"err": err,
}).Fatal("couldn't read config file")
}
_config = &cnf
_config = cnf
}

return _config
Expand Down
3 changes: 2 additions & 1 deletion cmd/pigeon/cmd_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var startCmd = &cobra.Command{
go func() {
health.StartHTTPServer(
ctx,
app.Config().HealthCheckPort(),
app.Config().HealthCheckAddress,
app.Config().HealthCheckPort,
pid,
app.Version(),
app.Commit(),
Expand Down
1 change: 1 addition & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
loop-timeout: 5s
health-check-address: 127.0.0.1
health-check-port: 5757

paloma:
Expand Down
46 changes: 25 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"fmt"
"io"
"os"
"os/user"
Expand All @@ -12,8 +13,9 @@ import (
)

const (
ChainName = "paloma"
Name = "pigeon"
ChainName = "paloma"
Name = "pigeon"
cDefaultHealthServerAddressBinding = "127.0.0.1"
)

type CosmosSpecificClientConfig struct {
Expand Down Expand Up @@ -45,8 +47,9 @@ func (f Filepath) Path() string {
return path.Clean(p)
}

type Root struct {
HealthCheckPortRaw int `yaml:"health-check-port"`
type Config struct {
HealthCheckPort int `yaml:"health-check-port"`
HealthCheckAddress string `yaml:"health-check-address"`

BloxrouteAuthorizationHeader string `yaml:"bloxroute-auth-header"`

Expand All @@ -55,15 +58,20 @@ type Root struct {
EVM map[string]EVM `yaml:"evm"`
}

func (r *Root) HealthCheckPort() int {
if r.HealthCheckPortRaw == 0 {
panic(whoops.String("invalid health check port in pigeon's config file"))
func (c *Config) defaults() *Config {
if len(c.HealthCheckAddress) < 1 {
c.HealthCheckAddress = cDefaultHealthServerAddressBinding
}
return r.HealthCheckPortRaw

return c
}

func (r *Root) init() {
(&r.Paloma).init()
func (c *Config) validate() (*Config, error) {
if c.HealthCheckPort == 0 {
return nil, fmt.Errorf("invalid health server port binding: %d", c.HealthCheckPort)
}

return c, nil
}

type EVM struct {
Expand All @@ -77,9 +85,6 @@ type Paloma struct {
ChainID string `yaml:"chain-id"`
}

func (p *Paloma) init() {
}

func KeyringPassword(envKey string) string {
envVal, ok := os.LookupEnv(envKey)
if !ok {
Expand All @@ -88,20 +93,19 @@ func KeyringPassword(envKey string) string {
return envVal
}

func FromReader(r io.Reader) (Root, error) {
var cnf Root
func FromReader(r io.Reader) (*Config, error) {
rawBody, err := io.ReadAll(r)
if err != nil {
return Root{}, err
return nil, err
}
str := string(rawBody)
str = os.ExpandEnv(str)
err = yaml.Unmarshal([]byte(str), &cnf)

var cfg Config
err = yaml.Unmarshal([]byte(str), &cfg)
if err != nil {
return Root{}, err
return nil, err
}

(&cnf).init()

return cnf, nil
return cfg.defaults().validate()
}
3 changes: 2 additions & 1 deletion health/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type jsonResponse struct {

func StartHTTPServer(
ctx context.Context,
addr string,
port int,
pid int,
appVersion string,
Expand All @@ -38,7 +39,7 @@ func StartHTTPServer(
})

server := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", port),
Addr: fmt.Sprintf("%s:%d", addr, port),
Handler: m,
BaseContext: func(_ net.Listener) context.Context {
return ctx
Expand Down
2 changes: 1 addition & 1 deletion internal/mev/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Client interface {
Relay(context.Context, *big.Int, *types.Transaction) (common.Hash, error)
}

func New(cfg *config.Root) Client {
func New(cfg *config.Config) Client {
if len(cfg.BloxrouteAuthorizationHeader) < 1 {
log.Info("BLXR Auth header not found. No MEV relayer support.")
return nil
Expand Down
2 changes: 1 addition & 1 deletion relayer/build_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (r *Relayer) processorFactory(chainInfo *evmtypes.ChainInfo) (chain.Process
// TODO: add support of other types of chains! Right now, only EVM types are supported!
retErr := whoops.Wrap(ErrMissingChainConfig, whoops.Errorf("reference chain id: %s").Format(chainInfo.GetChainReferenceID()))

cfg, ok := r.config.EVM[chainInfo.GetChainReferenceID()]
cfg, ok := r.cfg.EVM[chainInfo.GetChainReferenceID()]
if !ok {
return nil, retErr
}
Expand Down
10 changes: 5 additions & 5 deletions relayer/build_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestBuildProcessors(t *testing.T) {
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)

r := New(
config.Root{
&config.Config{
EVM: map[string]config.EVM{
"chain-1": {},
},
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestBuildProcessors(t *testing.T) {
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)

r := New(
config.Root{
&config.Config{
EVM: map[string]config.EVM{
"chain-1": {},
},
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestBuildProcessors(t *testing.T) {
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)

r := New(
config.Root{
&config.Config{
EVM: map[string]config.EVM{
"chain-1": {},
},
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestBuildProcessors(t *testing.T) {
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)

r := New(
config.Root{
&config.Config{
EVM: map[string]config.EVM{
"chain-1": {},
},
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestBuildProcessors(t *testing.T) {
)

r := New(
config.Root{
&config.Config{
EVM: map[string]config.EVM{
"chain-1": {},
},
Expand Down
2 changes: 1 addition & 1 deletion relayer/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *Relayer) HealthCheck(ctx context.Context) error {

func (r *Relayer) BootHealthCheck(ctx context.Context) error {
var g whoops.Group
for _, cfg := range r.config.EVM {
for _, cfg := range r.cfg.EVM {
g.Add(evm.TestAndVerifyConfig(ctx, cfg))
}
return g.Return()
Expand Down
4 changes: 2 additions & 2 deletions relayer/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var _ = Describe("health check", func() {

retErr := whoops.String("oh no")

var cfg config.Root
cfg := &config.Config{}

var val *stakingtypes.Validator
BeforeEach(func() {
Expand All @@ -50,7 +50,7 @@ var _ = Describe("health check", func() {
JustBeforeEach(func() {
r = &Relayer{
palomaClient: m,
config: cfg,
cfg: cfg,
evmFactory: fm,
}
})
Expand Down
119 changes: 119 additions & 0 deletions relayer/heartbeat/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package heartbeat

import (
"context"
"sync"
"time"

"github.com/palomachain/pigeon/internal/liblog"
"github.com/sirupsen/logrus"
)

const (
cMaxCacheRefreshAttempts int = 3
cCacheRefreshIntervalInBlocks int64 = 20
cDefaultBlockSpeed time.Duration = time.Millisecond * 1620
)

type keepAliveCache struct {
retryFalloff time.Duration
locker sync.Locker
estimatedBlockSpeed time.Duration
lastBlockHeight int64
lastRefresh time.Time
lastAliveUntil int64
queryBTL AliveUntilHeightQuery
queryBH CurrentHeightQuery
}

func (c *keepAliveCache) get(ctx context.Context) (int64, error) {
logger := liblog.WithContext(ctx).WithField("component", "cache")
if c.isStale() {
logger.WithFields(logrus.Fields{
"estimatedBlockSpeed": c.estimatedBlockSpeed,
"lastBlockHeight": c.lastBlockHeight,
"lastRefresh": c.lastRefresh,
"lastAliveUntil": c.lastAliveUntil,
}).Debug("cache is stale")
err := linearFalloffRetry(ctx, c.locker, "cache refresh", cMaxCacheRefreshAttempts, c.retryFalloff, c.refresh)
logger.WithFields(logrus.Fields{
"estimatedBlockSpeed": c.estimatedBlockSpeed,
"lastBlockHeight": c.lastBlockHeight,
"lastRefresh": c.lastRefresh,
"lastAliveUntil": c.lastAliveUntil,
}).WithError(err).Debug("cache refreshed")
if err != nil {
return 0, err
}
}

return c.lastAliveUntil, nil
}

func (c *keepAliveCache) refresh(ctx context.Context, locker sync.Locker) error {
defer locker.Unlock()
logger := liblog.WithContext(ctx).WithField("component", "cache")
logger.Debug("refreshing cache")

locker.Lock()
abh, err := c.queryBTL(ctx)
if err != nil {
logger.WithError(err).Error("failed to query alive until height")
return err
}

bh, err := c.queryBH(ctx)
if err != nil {
logger.WithError(err).Error("failed to query current height")
return err
}

c.estimatedBlockSpeed = c.estimateBlockSpeed(bh, time.Now().UTC())
c.lastAliveUntil = abh
c.lastBlockHeight = bh
c.lastRefresh = time.Now().UTC()

return nil
}

func (c *keepAliveCache) isStale() bool {
if c.estimatedBlockSpeed == 0 || c.lastBlockHeight == 0 || c.lastRefresh.IsZero() {
return true
}

elapsedMs := time.Now().UTC().Sub(c.lastRefresh).Milliseconds()
estimatedElapsedBlocks := elapsedMs / c.estimatedBlockSpeed.Milliseconds()

return estimatedElapsedBlocks >= cCacheRefreshIntervalInBlocks
}

func (c *keepAliveCache) estimateBlockSpeed(bh int64, t time.Time) time.Duration {
if c.lastBlockHeight == 0 || bh == 0 || t.IsZero() {
// During the first run, we have no historic data to
// compare to, so we set a rough estimate.
return cDefaultBlockSpeed
}

if t.Before(c.lastRefresh) {
return cDefaultBlockSpeed
}

blockDiff := bh - c.lastBlockHeight
timeDiff := t.Sub(c.lastRefresh)
bpms := timeDiff.Milliseconds() / int64(blockDiff)
return time.Duration(bpms) * time.Millisecond
}

func (c *keepAliveCache) estimateBlockHeight(t time.Time) int64 {
if c.estimatedBlockSpeed == 0 || c.lastRefresh.IsZero() || t.IsZero() {
return c.lastBlockHeight
}

if t.Before(c.lastRefresh) {
return c.lastBlockHeight
}

timeDiff := t.Sub(c.lastRefresh)
blockDiff := timeDiff.Milliseconds() / c.estimatedBlockSpeed.Milliseconds()
return c.lastBlockHeight + blockDiff
}
Loading

0 comments on commit 3bd32a4

Please sign in to comment.