Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVM-584 Panic -> send on closed channel inside syncer #1364

Merged
merged 2 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"reflect"
"sync"
"sync/atomic"

"github.com/hashicorp/go-hclog"
Expand All @@ -22,10 +23,11 @@ const (
type Topic struct {
logger hclog.Logger

topic *pubsub.Topic
typ reflect.Type
closeCh chan struct{}
closed *uint64
topic *pubsub.Topic
typ reflect.Type
closeCh chan struct{}
closed *uint64
waitGroup sync.WaitGroup
}

func (t *Topic) createObj() proto.Message {
Expand All @@ -43,12 +45,14 @@ func (t *Topic) Close() {
return
}

close(t.closeCh) // close all subscribers
t.waitGroup.Wait() // wait for all the subscribers to finish

// if all subscribers are finished, close the topic
if t.topic != nil {
t.topic.Close()
t.topic = nil
}

close(t.closeCh)
}

func (t *Topic) Publish(obj proto.Message) error {
Expand All @@ -75,6 +79,9 @@ func (t *Topic) Subscribe(handler func(obj interface{}, from peer.ID)) error {
}

func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{}, from peer.ID)) {
t.waitGroup.Add(1)
defer t.waitGroup.Done()

ctx, cancelFn := context.WithCancel(context.Background())

go func() {
Expand Down
26 changes: 17 additions & 9 deletions syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type syncPeerClient struct {
shouldEmitBlocks bool // flag for emitting blocks in the topic
closeCh chan struct{}
closed *uint64 // ACTIVE == 0, CLOSED == non-zero.

peerStatusUpdateChLock sync.Mutex
peerStatusUpdateChClosed bool
}

func NewSyncPeerClient(
Expand All @@ -56,6 +59,9 @@ func NewSyncPeerClient(
shouldEmitBlocks: true,
closeCh: make(chan struct{}),
closed: new(uint64),

peerStatusUpdateChLock: sync.Mutex{},
peerStatusUpdateChClosed: false,
}
}

Expand Down Expand Up @@ -95,7 +101,10 @@ func (m *syncPeerClient) Close() {
close(m.closeCh)
}

m.peerStatusUpdateChLock.Lock()
m.peerStatusUpdateChClosed = true
close(m.peerStatusUpdateCh)
m.peerStatusUpdateChLock.Unlock()
}

// DisablePublishingPeerStatus disables publishing own status via gossip
Expand Down Expand Up @@ -210,16 +219,15 @@ func (m *syncPeerClient) handleStatusUpdate(obj interface{}, from peer.ID) {
return
}

if atomic.LoadUint64(m.closed) > 0 {
m.logger.Debug("received status from peer after client was closed, ignoring", "id", from)
m.peerStatusUpdateChLock.Lock()
defer m.peerStatusUpdateChLock.Unlock()

return
}

m.peerStatusUpdateCh <- &NoForkPeer{
ID: from,
Number: status.Number,
Distance: m.network.GetPeerDistance(from),
if !m.peerStatusUpdateChClosed {
Stefan-Ethernal marked this conversation as resolved.
Show resolved Hide resolved
m.peerStatusUpdateCh <- &NoForkPeer{
ID: from,
Number: status.Number,
Distance: m.network.GetPeerDistance(from),
}
}
}

Expand Down