From b83a9e59e146e75fd649d547ed2a2197b81b51c2 Mon Sep 17 00:00:00 2001 From: YoshihitoAso Date: Tue, 12 Sep 2023 16:25:50 +0900 Subject: [PATCH] eth: better active protocol handler tracking --- eth/handler.go | 86 +++++++++++++++++++++++++++--------- eth/handler_eth.go | 2 +- eth/handler_qlight_client.go | 21 +++++---- eth/handler_qlight_server.go | 14 ++++-- eth/sync.go | 2 +- 5 files changed, 89 insertions(+), 36 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 093319389..96124afb0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -151,7 +151,9 @@ type handler struct { chainSync *chainSyncer wg sync.WaitGroup - peerWG sync.WaitGroup + + handlerStartCh chan struct{} + handlerDoneCh chan struct{} // Quorum raftMode bool @@ -173,15 +175,17 @@ func newHandler(config *handlerConfig) (*handler, error) { config.EventMux = new(event.TypeMux) // Nicety initialization for tests } h := &handler{ - networkID: config.Network, - forkFilter: forkid.NewFilter(config.Chain), - eventMux: config.EventMux, - database: config.Database, - txpool: config.TxPool, - chain: config.Chain, - peers: newPeerSet(), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: config.Network, + forkFilter: forkid.NewFilter(config.Chain), + eventMux: config.EventMux, + database: config.Database, + txpool: config.TxPool, + chain: config.Chain, + peers: newPeerSet(), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + handlerDoneCh: make(chan struct{}), + handlerStartCh: make(chan struct{}), // Quorum authorizationList: config.AuthorizationList, raftMode: config.RaftMode, @@ -285,9 +289,50 @@ func newHandler(config *handlerConfig) (*handler, error) { return h, nil } +// protoTracker tracks the number of active protocol handlers. +func (h *handler) protoTracker() { + defer h.wg.Done() + var active int + for { + select { + case <-h.handlerStartCh: + active++ + case <-h.handlerDoneCh: + active-- + case <-h.quitSync: + // Wait for all active handlers to finish. + for ; active > 0; active-- { + <-h.handlerDoneCh + } + return + } + } +} + +// incHandlers signals to increment the number of active handlers if not +// quitting. +func (h *handler) incHandlers() bool { + select { + case h.handlerStartCh <- struct{}{}: + return true + case <-h.quitSync: + return false + } +} + +// decHandlers signals to decrement the number of active handlers. +func (h *handler) decHandlers() { + h.handlerDoneCh <- struct{}{} +} + // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to // various subsistems and starts handling messages. func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() + // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism snap, err := h.peers.waitSnapExtension(peer) @@ -295,12 +340,6 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Snapshot extension barrier failed", "err", err) return err } - // TODO(karalabe): Not sure why this is needed - if !h.chainSync.handlePeerEvent(peer) { - return p2p.DiscQuitting - } - h.peerWG.Add(1) - defer h.peerWG.Done() // Execute the Ethereum handshake var ( @@ -366,7 +405,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } } - h.chainSync.handlePeerEvent(peer) + h.chainSync.handlePeerEvent() // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. @@ -411,8 +450,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // `eth`, all subsystem registrations and lifecycle management will be done by // the main `eth` handler to prevent strange races. func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error { - h.peerWG.Add(1) - defer h.peerWG.Done() + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() if err := h.peers.registerSnapExtension(peer); err != nil { peer.Log().Error("Snapshot extension registration failed", "err", err) @@ -492,6 +533,10 @@ func (h *handler) Start(maxPeers int) { h.wg.Add(2) go h.chainSync.loop() go h.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. + + // start peer handler tracker + h.wg.Add(1) + go h.protoTracker() } func (h *handler) Stop() { @@ -504,14 +549,13 @@ func (h *handler) Stop() { // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(h.quitSync) - h.wg.Wait() // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to h.peers yet // will exit when they try to register. h.peers.close() - h.peerWG.Wait() + h.wg.Wait() log.Info("Ethereum protocol stopped") } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 5b37968bf..073f58ef8 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -212,7 +212,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td // Update the peer's total difficulty if better than the previous if _, td := peer.Head(); trueTD.Cmp(td) > 0 { peer.SetHead(trueHead, trueTD) - h.chainSync.handlePeerEvent(peer) + h.chainSync.handlePeerEvent() } return nil } diff --git a/eth/handler_qlight_client.go b/eth/handler_qlight_client.go index 1699dff33..bfedc3d23 100644 --- a/eth/handler_qlight_client.go +++ b/eth/handler_qlight_client.go @@ -166,6 +166,11 @@ func newQLightClientHandler(config *handlerConfig) (*handler, error) { // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to // various subsistems and starts handling messages. func (h *handler) runQLightClientPeer(peer *qlightproto.Peer, handler qlightproto.Handler) error { + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() + // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism snap, err := h.peers.waitSnapExtension(peer.EthPeer) @@ -173,12 +178,6 @@ func (h *handler) runQLightClientPeer(peer *qlightproto.Peer, handler qlightprot peer.Log().Error("Snapshot extension barrier failed", "err", err) return err } - // TODO(karalabe): Not sure why this is needed - if !h.chainSync.handlePeerEvent(peer.EthPeer) { - return p2p.DiscQuitting - } - h.peerWG.Add(1) - defer h.peerWG.Done() // Execute the Ethereum handshake var ( @@ -271,7 +270,7 @@ func (h *handler) runQLightClientPeer(peer *qlightproto.Peer, handler qlightprot return err } } - h.chainSync.handlePeerEvent(peer.EthPeer) + h.chainSync.handlePeerEvent() // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. @@ -325,6 +324,10 @@ func (h *handler) StartQLightClient() { // start sync handlers h.wg.Add(1) go h.chainSync.loop() + + // start peer handler tracker + h.wg.Add(1) + go h.protoTracker() } func (h *handler) StopQLightClient() { @@ -334,14 +337,14 @@ func (h *handler) StopQLightClient() { // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(h.quitSync) - h.wg.Wait() // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to h.peers yet // will exit when they try to register. h.peers.close() - h.peerWG.Wait() + h.wg.Wait() + log.Info("QLight client protocol stopped") } diff --git a/eth/handler_qlight_server.go b/eth/handler_qlight_server.go index efd2de802..cc54f07b6 100644 --- a/eth/handler_qlight_server.go +++ b/eth/handler_qlight_server.go @@ -78,8 +78,10 @@ func newQLightServerHandler(config *handlerConfig) (*handler, error) { // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to // various subsistems and starts handling messages. func (h *handler) runQLightServerPeer(peer *qlightproto.Peer, handler qlightproto.Handler) error { - h.peerWG.Add(1) - defer h.peerWG.Done() + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() // Execute the Ethereum handshake var ( @@ -184,20 +186,24 @@ func (h *handler) StartQLightServer(maxPeers int) { h.wg.Add(1) go h.newBlockBroadcastLoop() + // start peer handler tracker + h.wg.Add(1) + go h.protoTracker() + h.authProvider.Initialize() } func (h *handler) StopQLightServer() { h.txsSub.Unsubscribe() close(h.quitSync) - h.wg.Wait() // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to h.peers yet // will exit when they try to register. h.peers.close() - h.peerWG.Wait() + h.wg.Wait() + log.Info("QLight server protocol stopped") } diff --git a/eth/sync.go b/eth/sync.go index fe7dcb068..62e9f96c5 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -183,7 +183,7 @@ func newChainSyncer(handler *handler) *chainSyncer { // handlePeerEvent notifies the syncer about a change in the peer set. // This is called for new peers and every time a peer announces a new // chain head. -func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool { +func (cs *chainSyncer) handlePeerEvent() bool { select { case cs.peerEventCh <- struct{}{}: return true