diff --git a/neutrino.go b/neutrino.go index d772ddae..9e1e1e1a 100644 --- a/neutrino.go +++ b/neutrino.go @@ -643,11 +643,6 @@ type ChainService struct { // nolint:maligned FilterCache *lru.Cache[FilterCacheKey, *CacheableFilter] BlockCache *lru.Cache[wire.InvVect, *CacheableBlock] - // queryPeers will be called to send messages to one or more peers, - // expecting a response. - queryPeers func(wire.Message, func(*ServerPeer, wire.Message, - chan<- struct{}), ...QueryOption) - chainParams chaincfg.Params addrManager *addrmgr.AddrManager connManager *connmgr.ConnManager @@ -747,16 +742,7 @@ func NewChainService(cfg Config) (*ChainService, error) { Ranking: query.NewPeerRanking(), }) - // We set the queryPeers method to point to queryChainServicePeers, - // passing a reference to the newly created ChainService. - s.queryPeers = func(msg wire.Message, f func(*ServerPeer, - wire.Message, chan<- struct{}), qo ...QueryOption) { - - queryChainServicePeers(&s, msg, f, qo...) - } - var err error - s.FilterDB, err = filterdb.New(cfg.Database, cfg.ChainParams) if err != nil { return nil, err diff --git a/query.go b/query.go index 32c56fe1..0dbd731c 100644 --- a/query.go +++ b/query.go @@ -382,168 +382,6 @@ checkResponses: } } -// queryChainServicePeers is a helper function that sends a query to one or -// more peers of the given ChainService, and waits for an answer. The timeout -// for queries is set by the QueryTimeout package-level variable or the Timeout -// functional option. -func queryChainServicePeers( - // s is the ChainService to use. - s *ChainService, - - // queryMsg is the message to send to each peer selected by selectPeer. - queryMsg wire.Message, - - // checkResponse is called for every message within the timeout period. - // The quit channel lets the query know to terminate because the - // required response has been found. This is done by closing the - // channel. - checkResponse func(sp *ServerPeer, resp wire.Message, - quit chan<- struct{}), - - // options takes functional options for executing the query. - options ...QueryOption) { - - // Starting with the set of default options, we'll apply any specified - // functional options to the query. - qo := defaultQueryOptions() - qo.applyQueryOptions(options...) - - // We get an initial view of our peers, to be updated each time a peer - // query times out. - queryPeer := s.blockManager.SyncPeer() - peerTries := make(map[string]uint8) - - // This will be state used by the peer query goroutine. - queryQuit := make(chan struct{}) - subQuit := make(chan struct{}) - - // Increase this number to be able to handle more queries at once as - // each channel gets results for all queries, otherwise messages can - // get mixed and there's a vicious cycle of retries causing a bigger - // message flood, more of which get missed. - msgChan := make(chan spMsg) - subscription := spMsgSubscription{ - msgChan: msgChan, - quitChan: subQuit, - } - - // Loop for any messages sent to us via our subscription channel and - // check them for whether they satisfy the query. Break the loop if - // it's time to quit. - peerTimeout := time.NewTimer(qo.timeout) - connectionTimeout := time.NewTimer(qo.peerConnectTimeout) - connectionTicker := connectionTimeout.C - if queryPeer != nil { - peerTries[queryPeer.Addr()]++ - queryPeer.subscribeRecvMsg(subscription) - queryPeer.QueueMessageWithEncoding(queryMsg, nil, qo.encoding) - } -checkResponses: - for { - select { - case <-connectionTicker: - // When we time out, we're done. - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - break checkResponses - - case <-queryQuit: - // Same when we get a quit signal. - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - break checkResponses - - case <-s.quit: - // Same when chain server's quit is signaled. - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - break checkResponses - - // A message has arrived over the subscription channel, so we - // execute the checkResponses callback to see if this ends our - // query session. - case sm := <-msgChan: - // TODO: This will get stuck if checkResponse gets - // stuck. This is a caveat for callers that should be - // fixed before exposing this function for public use. - checkResponse(sm.sp, sm.msg, queryQuit) - - // Each time we receive a response from the current - // peer, we'll reset the main peer timeout as they're - // being responsive. - if !peerTimeout.Stop() { - select { - case <-peerTimeout.C: - default: - } - } - peerTimeout.Reset(qo.timeout) - - // Also at this point, if the peerConnectTimeout is - // still active, then we can disable it, as we're - // receiving responses from the current peer. - if connectionTicker != nil && !connectionTimeout.Stop() { - select { - case <-connectionTimeout.C: - default: - } - } - connectionTicker = nil - - // The current peer we're querying has failed to answer the - // query. Time to select a new peer and query it. - case <-peerTimeout.C: - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - - queryPeer = nil - for _, peer := range s.Peers() { - // If the peer is no longer connected, we'll - // skip them. - if !peer.Connected() { - continue - } - - // If we've yet to try this peer, we'll make - // sure to do so. If we've exceeded the number - // of tries we should retry this peer, then - // we'll skip them. - numTries, ok := peerTries[peer.Addr()] - if ok && numTries >= qo.numRetries { - continue - } - - queryPeer = peer - - // Found a peer we can query. - peerTries[queryPeer.Addr()]++ - queryPeer.subscribeRecvMsg(subscription) - queryPeer.QueueMessageWithEncoding( - queryMsg, nil, qo.encoding, - ) - break - } - - // If at this point, we don't yet have a query peer, - // then we'll exit now as all the peers are exhausted. - if queryPeer == nil { - break checkResponses - } - } - } - - // Close the subscription quit channel and the done channel, if any. - close(subQuit) - peerTimeout.Stop() - if qo.doneChan != nil { - close(qo.doneChan) - } -} - // getFilterFromCache returns a filter from ChainService's FilterCache if it // exists, returning nil and error if it doesn't. func (s *ChainService) getFilterFromCache(blockHash *chainhash.Hash,