Skip to content

Commit

Permalink
query+neutrino: remove unused queryPeers function
Browse files Browse the repository at this point in the history
Since both GetBlock and GetCFilter now make use of the work dispatcher
instead of the `queryPeers` function, we can now remove the `queryPeers`
and `queryChainServicePeers` functions.
  • Loading branch information
ellemouton committed May 9, 2023
1 parent c66e33c commit b3ecf6e
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 176 deletions.
14 changes: 0 additions & 14 deletions neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,6 @@ type ChainService struct { // nolint:maligned
FilterCache *lru.Cache[FilterCacheKey, *CacheableFilter]
BlockCache *lru.Cache[wire.InvVect, *CacheableBlock]

// queryPeers will be called to send messages to one or more peers,
// expecting a response.
queryPeers func(wire.Message, func(*ServerPeer, wire.Message,
chan<- struct{}), ...QueryOption)

chainParams chaincfg.Params
addrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager
Expand Down Expand Up @@ -747,16 +742,7 @@ func NewChainService(cfg Config) (*ChainService, error) {
Ranking: query.NewPeerRanking(),
})

// We set the queryPeers method to point to queryChainServicePeers,
// passing a reference to the newly created ChainService.
s.queryPeers = func(msg wire.Message, f func(*ServerPeer,
wire.Message, chan<- struct{}), qo ...QueryOption) {

queryChainServicePeers(&s, msg, f, qo...)
}

var err error

s.FilterDB, err = filterdb.New(cfg.Database, cfg.ChainParams)
if err != nil {
return nil, err
Expand Down
162 changes: 0 additions & 162 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,168 +382,6 @@ checkResponses:
}
}

// queryChainServicePeers is a helper function that sends a query to one or
// more peers of the given ChainService, and waits for an answer. The timeout
// for queries is set by the QueryTimeout package-level variable or the Timeout
// functional option.
func queryChainServicePeers(
// s is the ChainService to use.
s *ChainService,

// queryMsg is the message to send to each peer selected by selectPeer.
queryMsg wire.Message,

// checkResponse is called for every message within the timeout period.
// The quit channel lets the query know to terminate because the
// required response has been found. This is done by closing the
// channel.
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}),

// options takes functional options for executing the query.
options ...QueryOption) {

// Starting with the set of default options, we'll apply any specified
// functional options to the query.
qo := defaultQueryOptions()
qo.applyQueryOptions(options...)

// We get an initial view of our peers, to be updated each time a peer
// query times out.
queryPeer := s.blockManager.SyncPeer()
peerTries := make(map[string]uint8)

// This will be state used by the peer query goroutine.
queryQuit := make(chan struct{})
subQuit := make(chan struct{})

// Increase this number to be able to handle more queries at once as
// each channel gets results for all queries, otherwise messages can
// get mixed and there's a vicious cycle of retries causing a bigger
// message flood, more of which get missed.
msgChan := make(chan spMsg)
subscription := spMsgSubscription{
msgChan: msgChan,
quitChan: subQuit,
}

// Loop for any messages sent to us via our subscription channel and
// check them for whether they satisfy the query. Break the loop if
// it's time to quit.
peerTimeout := time.NewTimer(qo.timeout)
connectionTimeout := time.NewTimer(qo.peerConnectTimeout)
connectionTicker := connectionTimeout.C
if queryPeer != nil {
peerTries[queryPeer.Addr()]++
queryPeer.subscribeRecvMsg(subscription)
queryPeer.QueueMessageWithEncoding(queryMsg, nil, qo.encoding)
}
checkResponses:
for {
select {
case <-connectionTicker:
// When we time out, we're done.
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
break checkResponses

case <-queryQuit:
// Same when we get a quit signal.
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
break checkResponses

case <-s.quit:
// Same when chain server's quit is signaled.
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
break checkResponses

// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
case sm := <-msgChan:
// TODO: This will get stuck if checkResponse gets
// stuck. This is a caveat for callers that should be
// fixed before exposing this function for public use.
checkResponse(sm.sp, sm.msg, queryQuit)

// Each time we receive a response from the current
// peer, we'll reset the main peer timeout as they're
// being responsive.
if !peerTimeout.Stop() {
select {
case <-peerTimeout.C:
default:
}
}
peerTimeout.Reset(qo.timeout)

// Also at this point, if the peerConnectTimeout is
// still active, then we can disable it, as we're
// receiving responses from the current peer.
if connectionTicker != nil && !connectionTimeout.Stop() {
select {
case <-connectionTimeout.C:
default:
}
}
connectionTicker = nil

// The current peer we're querying has failed to answer the
// query. Time to select a new peer and query it.
case <-peerTimeout.C:
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}

queryPeer = nil
for _, peer := range s.Peers() {
// If the peer is no longer connected, we'll
// skip them.
if !peer.Connected() {
continue
}

// If we've yet to try this peer, we'll make
// sure to do so. If we've exceeded the number
// of tries we should retry this peer, then
// we'll skip them.
numTries, ok := peerTries[peer.Addr()]
if ok && numTries >= qo.numRetries {
continue
}

queryPeer = peer

// Found a peer we can query.
peerTries[queryPeer.Addr()]++
queryPeer.subscribeRecvMsg(subscription)
queryPeer.QueueMessageWithEncoding(
queryMsg, nil, qo.encoding,
)
break
}

// If at this point, we don't yet have a query peer,
// then we'll exit now as all the peers are exhausted.
if queryPeer == nil {
break checkResponses
}
}
}

// Close the subscription quit channel and the done channel, if any.
close(subQuit)
peerTimeout.Stop()
if qo.doneChan != nil {
close(qo.doneChan)
}
}

// getFilterFromCache returns a filter from ChainService's FilterCache if it
// exists, returning nil and error if it doesn't.
func (s *ChainService) getFilterFromCache(blockHash *chainhash.Hash,
Expand Down

0 comments on commit b3ecf6e

Please sign in to comment.