Skip to content

Commit

Permalink
feat(net): better timeout
Browse files Browse the repository at this point in the history
converts fetcher timeout to an unresponsiveness check -- if no data is received for 10 seconds,
consider the request failed. also provides variadic options to the fetcher to override the
unresponsiveness timeout
  • Loading branch information
hannahhoward committed Sep 18, 2019
1 parent 2879caa commit d91a6e6
Show file tree
Hide file tree
Showing 2 changed files with 309 additions and 40 deletions.
97 changes: 71 additions & 26 deletions net/graphsync_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
var logGraphsyncFetcher = logging.Logger("net.graphsync_fetcher")

const (
// Timeout for a single graphsync request (which may be for many blocks).
// We might prefer this timeout to scale with the number of blocks expected in the fetch,
// when that number is large.
requestTimeout = 60 * time.Second
// Timeout for a single graphsync request getting "stuck"
// -- if no more responses are received for a period greater than this,
// we will assume the request has hung-up and cancel it
unresponsiveTimeout = 10 * time.Second
)

// Fetcher defines an interface that may be used to fetch data from the network.
Expand Down Expand Up @@ -55,23 +55,41 @@ type graphsyncFallbackPeerTracker interface {
// GraphSyncFetcher is used to fetch data over the network. It is implemented
// using a Graphsync exchange to fetch tipsets recursively
type GraphSyncFetcher struct {
exchange GraphExchange
validator consensus.SyntaxValidator
store bstore.Blockstore
ssb selectorbuilder.SelectorSpecBuilder
peerTracker graphsyncFallbackPeerTracker
exchange GraphExchange
validator consensus.SyntaxValidator
store bstore.Blockstore
ssb selectorbuilder.SelectorSpecBuilder
peerTracker graphsyncFallbackPeerTracker
unresponsiveTimeout time.Duration
}

// GraphsyncFetcherOption is function that configures graphsync. It should not
// be created directly but should instead generated through an option function like
// UseFetcherTimeout
type GraphsyncFetcherOption func(*GraphSyncFetcher)

// UseUnresponsiveTimeout sets up the GraphsyncFetcher with a different
// unresponsiveness timeout than the default
func UseUnresponsiveTimeout(timeout time.Duration) GraphsyncFetcherOption {
return func(gsf *GraphSyncFetcher) {
gsf.unresponsiveTimeout = timeout
}
}

// NewGraphSyncFetcher returns a GraphsyncFetcher wired up to the input Graphsync exchange and
// attached local blockservice for reloading blocks in memory once they are returned
func NewGraphSyncFetcher(ctx context.Context, exchange GraphExchange, blockstore bstore.Blockstore,
bv consensus.SyntaxValidator, pt graphsyncFallbackPeerTracker) *GraphSyncFetcher {
bv consensus.SyntaxValidator, pt graphsyncFallbackPeerTracker, options ...GraphsyncFetcherOption) *GraphSyncFetcher {
gsf := &GraphSyncFetcher{
store: blockstore,
validator: bv,
exchange: exchange,
ssb: selectorbuilder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()),
peerTracker: pt,
store: blockstore,
validator: bv,
exchange: exchange,
ssb: selectorbuilder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()),
peerTracker: pt,
unresponsiveTimeout: unresponsiveTimeout,
}
for _, option := range options {
option(gsf)
}
return gsf
}
Expand Down Expand Up @@ -224,27 +242,57 @@ func (gsf *GraphSyncFetcher) fetchBlocks(ctx context.Context, cids []cid.Cid, ta
efsb.Insert("messageReceipts", gsf.ssb.Matcher())
}).Node()
errChans := make([]<-chan error, 0, len(cids))
requestCtx, requestCancel := context.WithTimeout(ctx, requestTimeout)
defer requestCancel()
requestChans := make([]<-chan graphsync.ResponseProgress, 0, len(cids))
cancelFuncs := make([]func(), 0, len(cids))
for _, c := range cids {
_, errChan := gsf.exchange.Request(requestCtx, targetPeer, cidlink.Link{Cid: c}, selector)
requestCtx, requestCancel := context.WithCancel(ctx)
defer requestCancel()
requestChan, errChan := gsf.exchange.Request(requestCtx, targetPeer, cidlink.Link{Cid: c}, selector)
errChans = append(errChans, errChan)
requestChans = append(requestChans, requestChan)
cancelFuncs = append(cancelFuncs, requestCancel)
}
// Any of the multiple parallel requests might fail. Wait for all of them to complete, then
// return any error (in this case, the last one to be received).
var anyError error
for _, errChan := range errChans {
for err := range errChan {
for i, errChan := range errChans {
requestChan := requestChans[i]
cancelFunc := cancelFuncs[i]
err := gsf.consumeResponse(requestChan, errChan, cancelFunc)
if err != nil {
anyError = err
}
}
return anyError
}

func (gsf *GraphSyncFetcher) consumeResponse(requestChan <-chan graphsync.ResponseProgress, errChan <-chan error, cancelFunc func()) error {
timer := time.NewTimer(gsf.unresponsiveTimeout)
var anyError error
for errChan != nil || requestChan != nil {
select {
case err, ok := <-errChan:
if !ok {
errChan = nil
}
anyError = err
timer.Reset(gsf.unresponsiveTimeout)
case _, ok := <-requestChan:
if !ok {
requestChan = nil
}
timer.Reset(gsf.unresponsiveTimeout)
case <-timer.C:
cancelFunc()
}
}
return anyError
}

// fetchBlocksRecursively gets the blocks from recursionDepth ancestor tipsets
// starting from baseCid.
func (gsf *GraphSyncFetcher) fetchBlocksRecursively(ctx context.Context, baseCid cid.Cid, targetPeer peer.ID, recursionDepth int) error {
requestCtx, requestCancel := context.WithTimeout(ctx, requestTimeout)
requestCtx, requestCancel := context.WithCancel(ctx)
defer requestCancel()

// recursive selector to fetch n sets of parent blocks
Expand All @@ -264,11 +312,8 @@ func (gsf *GraphSyncFetcher) fetchBlocksRecursively(ctx context.Context, baseCid
))
})).Node()

_, errChan := gsf.exchange.Request(requestCtx, targetPeer, cidlink.Link{Cid: baseCid}, selector)
for err := range errChan {
return err
}
return nil
requestChan, errChan := gsf.exchange.Request(requestCtx, targetPeer, cidlink.Link{Cid: baseCid}, selector)
return gsf.consumeResponse(requestChan, errChan, requestCancel)
}

// Loads the IPLD blocks for all blocks in a tipset, and checks for the presence of the
Expand Down
Loading

0 comments on commit d91a6e6

Please sign in to comment.