From 04412a7216b110fc12f1d8bdbc426b8c0a429290 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 17 Jan 2024 12:51:52 +0100 Subject: [PATCH] fix(core): Do not propagate blocks if subscribed to blocks from incorrect chain (#3086) Right now, it is possible for a **bridge** node that was initialised and started on one chain (e.g. `mocha-4`) to be stopped and restarted on a different chain (e.g. `mainnet`) and propagate headers from the old chain (`mocha-4`) into the different network (`mainnet`). This PR fixes this issue by causing the listener to fatal if the listener recognises it is receiving blocks from a different chain to that which it expects. Error will look like this: ``` 2024-01-10T16:06:22.001+0100 ERROR core core/listener.go:175 listener: received block with unexpected chain ID: expected arabica-11, received mocha-4 2024-01-10T16:06:22.001+0100 INFO core core/listener.go:177 listener: listening stopped 2024-01-10T16:06:22.001+0100 FATAL core core/listener.go:126 listener: invalid subscription ``` Resolves #3071 --- core/exchange_test.go | 4 ++- core/listener.go | 20 +++++++++++--- core/listener_test.go | 45 +++++++++++++++++++++++++++++--- core/option.go | 10 +++++++ libs/utils/resetctx.go | 4 ++- nodebuilder/core/module.go | 3 ++- nodebuilder/tests/swamp/swamp.go | 1 + 7 files changed, 77 insertions(+), 10 deletions(-) diff --git a/core/exchange_test.go b/core/exchange_test.go index c43084c57d..95c7f83385 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -20,7 +20,9 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - fetcher, _ := createCoreFetcher(t, DefaultTestConfig()) + cfg := DefaultTestConfig() + cfg.ChainID = networkID + fetcher, _ := createCoreFetcher(t, cfg) // generate 10 blocks generateBlocks(t, fetcher) diff --git a/core/listener.go b/core/listener.go index 8447733506..10255fc4cc 100644 --- a/core/listener.go +++ b/core/listener.go @@ -23,6 +23,8 @@ import ( var ( tracer = otel.Tracer("core/listener") retrySubscriptionDelay = 5 * time.Second + + errInvalidSubscription = errors.New("invalid subscription") ) // Listener is responsible for listening to Core for @@ -41,11 +43,12 @@ type Listener struct { headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader] hashBroadcaster shrexsub.BroadcastFn - listenerTimeout time.Duration - metrics *listenerMetrics - cancel context.CancelFunc + chainID string + + listenerTimeout time.Duration + cancel context.CancelFunc } func NewListener( @@ -81,6 +84,7 @@ func NewListener( store: store, listenerTimeout: 5 * blocktime, metrics: metrics, + chainID: p.chainID, }, nil } @@ -117,6 +121,10 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat // listener stopped because external context was canceled return } + if errors.Is(err, errInvalidSubscription) { + // stop node if there is a critical issue with the block subscription + log.Fatalf("listener: %v", err) + } log.Warnw("listener: subscriber error, resubscribing...", "err", err) sub = cl.resubscribe(ctx) @@ -163,6 +171,12 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned return errors.New("underlying subscription was closed") } + if cl.chainID != "" && b.Header.ChainID != cl.chainID { + log.Errorf("listener: received block with unexpected chain ID: expected %s,"+ + " received %s", cl.chainID, b.Header.ChainID) + return errInvalidSubscription + } + log.Debugw("listener: new block from core", "height", b.Header.Height) err := cl.handleNewSignedBlock(ctx, b) diff --git a/core/listener_test.go b/core/listener_test.go index bf84c07b41..5ddcd5541d 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -47,10 +47,14 @@ func TestListener(t *testing.T) { t.Cleanup(subs.Cancel) // create one block to store as Head in local store and then unsubscribe from block events - fetcher, _ := createCoreFetcher(t, DefaultTestConfig()) + cfg := DefaultTestConfig() + cfg.ChainID = networkID + fetcher, _ := createCoreFetcher(t, cfg) + eds := createEdsPubSub(ctx, t) + // create Listener and start listening - cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t)) + cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), networkID) err = cl.Start(ctx) require.NoError(t, err) @@ -80,6 +84,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) { // create one block to store as Head in local store and then unsubscribe from block events cfg := DefaultTestConfig() + cfg.ChainID = networkID fetcher, cctx := createCoreFetcher(t, cfg) eds := createEdsPubSub(ctx, t) @@ -92,7 +97,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) { }) // create Listener and start listening - cl := createListener(ctx, t, fetcher, ps0, eds, store) + cl := createListener(ctx, t, fetcher, ps0, eds, store, networkID) err = cl.Start(ctx) require.NoError(t, err) @@ -124,6 +129,36 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) { require.Nil(t, cl.cancel) } +func TestListenerWithWrongChainRPC(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + // create mocknet with two pubsub endpoints + ps0, _ := createMocknetWithTwoPubsubEndpoints(ctx, t) + + // create one block to store as Head in local store and then unsubscribe from block events + cfg := DefaultTestConfig() + cfg.ChainID = networkID + fetcher, _ := createCoreFetcher(t, cfg) + eds := createEdsPubSub(ctx, t) + + store := createStore(t) + err := store.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = store.Stop(ctx) + require.NoError(t, err) + }) + + // create Listener and start listening + cl := createListener(ctx, t, fetcher, ps0, eds, store, "wrong-chain-rpc") + sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) + require.NoError(t, err) + + err = cl.listen(ctx, sub) + assert.ErrorIs(t, err, errInvalidSubscription) +} + func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) { net, err := mocknet.FullMeshLinked(2) require.NoError(t, err) @@ -166,6 +201,7 @@ func createListener( ps *pubsub.PubSub, edsSub *shrexsub.PubSub, store *eds.Store, + chainID string, ) *Listener { p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID)) require.NoError(t, err) @@ -180,7 +216,8 @@ func createListener( require.NoError(t, p2pSub.Stop(ctx)) }) - listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, store, nodep2p.BlockTime) + listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, + store, nodep2p.BlockTime, WithChainID(nodep2p.Network(chainID))) require.NoError(t, err) return listener } diff --git a/core/option.go b/core/option.go index 6e06fade48..6916ced4d8 100644 --- a/core/option.go +++ b/core/option.go @@ -1,9 +1,13 @@ package core +import "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + type Option func(*params) type params struct { metrics bool + + chainID string } // WithMetrics is a functional option that enables metrics @@ -13,3 +17,9 @@ func WithMetrics() Option { p.metrics = true } } + +func WithChainID(id p2p.Network) Option { + return func(p *params) { + p.chainID = id.String() + } +} diff --git a/libs/utils/resetctx.go b/libs/utils/resetctx.go index 3014ba81db..a108cc27b4 100644 --- a/libs/utils/resetctx.go +++ b/libs/utils/resetctx.go @@ -1,6 +1,8 @@ package utils -import "context" +import ( + "context" +) // ResetContextOnError returns a fresh context if the given context has an error. func ResetContextOnError(ctx context.Context) context.Context { diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index fec7c14b1b..7c5c9e6bfd 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -56,8 +56,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option pubsub *shrexsub.PubSub, construct header.ConstructFn, store *eds.Store, + chainID p2p.Network, ) (*core.Listener, error) { - var opts []core.Option + opts := []core.Option{core.WithChainID(chainID)} if MetricsEnabled { opts = append(opts, core.WithMetrics()) } diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index e3ac3ad4f2..617fe76151 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -78,6 +78,7 @@ func NewSwamp(t *testing.T, options ...Option) *Swamp { // Now, we are making an assumption that consensus mechanism is already tested out // so, we are not creating bridge nodes with each one containing its own core client // instead we are assigning all created BNs to 1 Core from the swamp + ic.WithChainID("private") cctx := core.StartTestNodeWithConfig(t, ic) swp := &Swamp{ t: t,