Skip to content

Commit

Permalink
BCF-2368: ensure unique name per LOOPP (#9762)
Browse files Browse the repository at this point in the history
End to end work uncovered a problem where a plugin would not start
because the prom server couldn't get a port. Ultimately this was caused
by duplicate names in the LoopRegistry.

This change makes names unique and, to ensure better error handling, it
changes the Register function to error if called more than once. The latter
change will make the problem much more clear should it ever happen again.
  • Loading branch information
krehermann authored Jul 11, 2023
1 parent b4b6b51 commit 78a1697
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 44 deletions.
2 changes: 1 addition & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
MailMon: mailMon,
}

loopRegistry := plugins.NewLoopRegistry()
loopRegistry := plugins.NewLoopRegistry(appLggr.Named("LoopRegistry"))

var chains chainlink.Chains
chains.EVM, err = evm.NewTOMLChainSet(ctx, ccOpts)
Expand Down
10 changes: 6 additions & 4 deletions core/cmd/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,11 @@ func TestNewUserCache(t *testing.T) {
}

func TestSetupSolanaRelayer(t *testing.T) {
reg := plugins.NewLoopRegistry()
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr)
ks := mocks.NewSolana(t)
rf := cmd.RelayerFactory{
Logger: logger.TestLogger(t),
Logger: lggr,
DB: pgtest.NewSqlxDB(t),
GeneralConfig: configtest.NewGeneralConfig(t, nil),
LoopRegistry: reg,
Expand Down Expand Up @@ -364,10 +365,11 @@ func TestSetupSolanaRelayer(t *testing.T) {
}

func TestSetupStarkNetRelayer(t *testing.T) {
reg := plugins.NewLoopRegistry()
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr)
ks := mocks.NewStarkNet(t)
rf := cmd.RelayerFactory{
Logger: logger.TestLogger(t),
Logger: lggr,
DB: pgtest.NewSqlxDB(t),
GeneralConfig: configtest.NewGeneralConfig(t, nil),
LoopRegistry: reg,
Expand Down
2 changes: 1 addition & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
RestrictedHTTPClient: c,
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(),
LoopRegistry: plugins.NewLoopRegistry(lggr),
})
require.NoError(t, err)
app := appInstance.(*chainlink.ChainlinkApplication)
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
// we need to initialize in case we serve OCR2 LOOPs
loopRegistry := opts.LoopRegistry
if loopRegistry == nil {
loopRegistry = plugins.NewLoopRegistry()
loopRegistry = plugins.NewLoopRegistry(globalLogger.Named("LoopRegistry"))
}

// If the audit logger is enabled
Expand Down
5 changes: 3 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ func NewMedianServices(ctx context.Context,
}, lggr)

if cmdName := env.MedianPluginCmd.Get(); cmdName != "" {
medianLggr := lggr.Named("Median")
// use logger name to ensure unique naming

// use unique logger names so we can use it to register a loop
medianLggr := lggr.Named("Median").Named(spec.ContractID).Named(spec.GetID())
cmdFn, telem, err2 := cfg.RegisterLOOP(medianLggr.Name(), cmdName)
if err2 != nil {
err = fmt.Errorf("failed to register loop: %w", err2)
Expand Down
9 changes: 5 additions & 4 deletions core/web/loop_registry_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/plugins"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type responseWriter struct {
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestLoopRegistryServer_CantWriteToResponse(t *testing.T) {
l, o := logger.TestLoggerObserved(t, zap.ErrorLevel)
s := &LoopRegistryServer{
exposedPromPort: 1,
registry: plugins.NewLoopRegistry(),
registry: plugins.NewLoopRegistry(l),
logger: l.(logger.SugaredLogger),
jsonMarshalFn: json.Marshal,
}
Expand All @@ -52,7 +53,7 @@ func TestLoopRegistryServer_CantMarshal(t *testing.T) {
l, o := logger.TestLoggerObserved(t, zap.ErrorLevel)
s := &LoopRegistryServer{
exposedPromPort: 1,
registry: plugins.NewLoopRegistry(),
registry: plugins.NewLoopRegistry(l),
logger: l.(logger.SugaredLogger),
jsonMarshalFn: func(any) ([]byte, error) {
return []byte(""), errors.New("can't unmarshal")
Expand Down
45 changes: 19 additions & 26 deletions plugins/loop_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"sort"
"sync"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
)

const (
Expand All @@ -22,26 +24,35 @@ type RegisteredLoop struct {
type LoopRegistry struct {
mu sync.Mutex
registry map[string]*RegisteredLoop

lggr logger.Logger
}

func NewLoopRegistry() *LoopRegistry {
func NewLoopRegistry(lggr logger.Logger) *LoopRegistry {
return &LoopRegistry{
registry: map[string]*RegisteredLoop{},
lggr: lggr,
}
}

// Register creates a port of the plugin. It is idempotent. Duplicate calls to Register will return the same port
// Register creates a port of the plugin. It is not idempotent. Duplicate calls to Register will return [ErrExists]
// Safe for concurrent use.
func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) {
m.mu.Lock()
defer m.mu.Unlock()

p, ok := m.get(id)
if !ok {
return m.create(id)
if _, exists := m.registry[id]; exists {
return nil, ErrExists
}
return p, nil
nextPort := pluginDefaultPort + len(m.registry)
envCfg := NewEnvConfig(nextPort)

m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg}
m.lggr.Debug("Registered loopp %q with config %v, port %d", id, envCfg, envCfg.PrometheusPort())
return m.registry[id], nil
}

// Return slice sorted by plugin name. Safe for concurrent use.
func (m *LoopRegistry) List() []*RegisteredLoop {
var registeredLoops []*RegisteredLoop
m.mu.Lock()
Expand All @@ -56,29 +67,11 @@ func (m *LoopRegistry) List() []*RegisteredLoop {
return registeredLoops
}

// Get plugin by id. Safe for concurrent use.
func (m *LoopRegistry) Get(id string) (*RegisteredLoop, bool) {
m.mu.Lock()
defer m.mu.Unlock()

return m.get(id)
}

// create returns a port number for the given plugin to use for prometheus handler.
// NOT safe for concurrent use.
func (m *LoopRegistry) create(pluginName string) (*RegisteredLoop, error) {
if _, exists := m.registry[pluginName]; exists {
return nil, ErrExists
}
nextPort := pluginDefaultPort + len(m.registry)
envCfg := NewEnvConfig(nextPort)

m.registry[pluginName] = &RegisteredLoop{Name: pluginName, EnvCfg: envCfg}
return m.registry[pluginName], nil
}

// get is a helper to return the port assigned to the plugin, if any
// NOT safe for concurrent use.
func (m *LoopRegistry) get(pluginName string) (*RegisteredLoop, bool) {
p, exists := m.registry[pluginName]
p, exists := m.registry[id]
return p, exists
}
11 changes: 6 additions & 5 deletions plugins/loop_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ import (

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

func TestPluginPortManager(t *testing.T) {
// register one
m := plugins.NewLoopRegistry()
m := plugins.NewLoopRegistry(logger.TestLogger(t))
pFoo, err := m.Register("foo")
require.NoError(t, err)
require.Equal(t, "foo", pFoo.Name)
require.Greater(t, pFoo.EnvCfg.PrometheusPort(), 0)
// test idempotent
pSame, err := m.Register("foo")
require.NoError(t, err)
require.Equal(t, pFoo, pSame)
// test duplicate
pNil, err := m.Register("foo")
require.ErrorIs(t, err, plugins.ErrExists)
require.Nil(t, pNil)
// ensure increasing port assignment
pBar, err := m.Register("bar")
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions plugins/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewPromServer(port int, lggr logger.Logger, opts ...PromServerOpt) *PromSer

// Start start HTTP server on specified port to handle metrics requests
func (p *PromServer) Start() error {
p.lggr.Debugf("Starting prom server on port %d", p.port)
err := p.setupListener()
if err != nil {
return err
Expand Down

0 comments on commit 78a1697

Please sign in to comment.