Skip to content

Commit

Permalink
Some SyncPeerStatus messages were not published (#1320)
Browse files Browse the repository at this point in the history
* syncPeerClient subscription channel fixed
  • Loading branch information
stana-miric committed Mar 23, 2023
1 parent 90d8020 commit 15deea1
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 26 deletions.
2 changes: 2 additions & 0 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ func (b *Blockchain) WriteFullBlock(fblock *types.FullBlock, source string) erro
"txs", len(block.Transactions),
"hash", header.Hash,
"parent", header.ParentHash,
"source", source,
}

if prevHeader, ok := b.GetHeaderByNumber(header.Number - 1); ok {
Expand Down Expand Up @@ -970,6 +971,7 @@ func (b *Blockchain) WriteBlock(block *types.Block, source string) error {
"txs", len(block.Transactions),
"hash", header.Hash,
"parent", header.ParentHash,
"source", source,
}

if prevHeader, ok := b.GetHeaderByNumber(header.Number - 1); ok {
Expand Down
39 changes: 14 additions & 25 deletions blockchain/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,19 @@ type Subscription interface {
// FOR TESTING PURPOSES //

type MockSubscription struct {
eventCh chan *Event
*subscription
}

func NewMockSubscription() *MockSubscription {
return &MockSubscription{eventCh: make(chan *Event)}
return &MockSubscription{
subscription: &subscription{
updateCh: make(chan *Event),
closeCh: make(chan void),
},
}
}

func (m *MockSubscription) Push(e *Event) {
m.eventCh <- e
}

func (m *MockSubscription) GetEventCh() chan *Event {
return m.eventCh
}

func (m *MockSubscription) GetEvent() *Event {
evnt := <-m.eventCh

return evnt
}

func (m *MockSubscription) Close() {
m.updateCh <- e
}

// subscription is the Blockchain event subscription object
Expand Down Expand Up @@ -68,14 +59,12 @@ func (s *subscription) GetEventCh() chan *Event {

// GetEvent returns the event from the subscription (BLOCKING)
func (s *subscription) GetEvent() *Event {
for {
// Wait for an update
select {
case ev := <-s.updateCh:
return ev
case <-s.closeCh:
return nil
}
// Wait for an update
select {
case ev := <-s.updateCh:
return ev
case <-s.closeCh:
return nil
}
}

Expand Down
3 changes: 2 additions & 1 deletion syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,15 @@ func (m *syncPeerClient) handleStatusUpdate(obj interface{}, from peer.ID) {
// startNewBlockProcess starts blockchain event subscription
func (m *syncPeerClient) startNewBlockProcess() {
m.subscription = m.blockchain.SubscribeEvents()
eventCh := m.subscription.GetEventCh()

for {
var event *blockchain.Event

select {
case <-m.closeCh:
return
case event = <-m.subscription.GetEventCh():
case event = <-eventCh:
}

if !m.shouldEmitBlocks {
Expand Down
100 changes: 100 additions & 0 deletions syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/network"
Expand Down Expand Up @@ -524,3 +525,102 @@ func Test_syncPeerClient_GetBlocks(t *testing.T) {

assert.Equal(t, expected, blocks)
}

func Test_EmitMultipleBlocks(t *testing.T) {
t.Parallel()

var (
// network layer
clientSrv = newTestNetwork(t)
peerSrv = newTestNetwork(t)

clientLatest = uint64(10)

subscription = blockchain.NewMockSubscription()

client = newTestSyncPeerClient(clientSrv, &mockBlockchain{
subscription: subscription,
headerHandler: newSimpleHeaderHandler(clientLatest),
})
)

t.Cleanup(func() {
clientSrv.Close()
peerSrv.Close()
client.Close()
})

err := network.JoinAndWaitMultiple(
network.DefaultJoinTimeout,
clientSrv,
peerSrv,
)

require.NoError(t, err)

// start gossip
require.NoError(t, client.startGossip())

// start to subscribe blockchain events
go client.startNewBlockProcess()

// push latest block number to blockchain subscription
pushSubscription := func(sub *blockchain.MockSubscription, latest uint64) {
sub.Push(&blockchain.Event{
NewChain: []*types.Header{
{
Number: latest,
},
},
})
}

waitForGossip := func(wg *sync.WaitGroup) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return true
case <-time.After(5 * time.Second):
return false
}
}

// create topic & subscribe in peer
topic, err := peerSrv.NewTopic(statusTopicName, &proto.SyncPeerStatus{})
assert.NoError(t, err)

testGossip := func(t *testing.T, blocksNum int) {
t.Helper()

var wgForGossip sync.WaitGroup

wgForGossip.Add(blocksNum)

require.NoError(t, topic.Subscribe(func(_ interface{}, _ peer.ID) {
wgForGossip.Done()
}))

// need to wait for a few seconds to propagate subscribing
time.Sleep(2 * time.Second)
client.EnablePublishingPeerStatus()

go func() {
for i := 0; i < blocksNum; i++ {
pushSubscription(subscription, clientLatest+uint64(i))
}
}()

gossiped := waitForGossip(&wgForGossip)

require.Equal(t, true, gossiped)
}

t.Run("should receive all blocks", func(t *testing.T) {
t.Parallel()
testGossip(t, 4)
})
}

0 comments on commit 15deea1

Please sign in to comment.