diff --git a/README.md b/README.md index 8b215b49..eb7594ca 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ If your existing library (i.e. `go-ipfs` or `go-filecoin`) uses these other olde ## Install -`go-graphsync` requires Go >= 1.11 and can be installed using Go modules +`go-graphsync` requires Go >= 1.13 and can be installed using Go modules ## Usage @@ -58,11 +58,10 @@ import ( var ctx context.Context var host libp2p.Host -var loader ipld.Loader -var storer ipld.Storer +var lsys ipld.LinkSystem network := gsnet.NewFromLibp2pHost(host) -exchange := graphsync.New(ctx, network, loader, storer) +exchange := graphsync.New(ctx, network, lsys) ``` Parameter Notes: @@ -70,12 +69,11 @@ Parameter Notes: 1. `context` is just the parent context for all of GraphSync 2. `network` is a network abstraction provided to Graphsync on top of libp2p. This allows graphsync to be tested without the actual network -3. `loader` is used to load blocks from content ids from the local block store. It's used when RESPONDING to requests from other clients. It should conform to the IPLD loader interface: https://github.com/ipld/go-ipld-prime/blob/master/linking.go -4. `storer` is used to store incoming blocks to the local block store. It's used when REQUESTING a graphsync query, to store blocks locally once they are validated as part of the correct response. It should conform to the IPLD storer interface: https://github.com/ipld/go-ipld-prime/blob/master/linking.go +3. `lsys` is an go-ipld-prime LinkSystem, which provides mechanisms loading and constructing go-ipld-prime nodes from a link, and saving ipld prime nodes to serialized data ### Using GraphSync With An IPFS BlockStore -GraphSync provides two convenience functions in the `storeutil` package for +GraphSync provides a convenience function in the `storeutil` package for integrating with BlockStore's from IPFS. ```golang @@ -92,103 +90,9 @@ var host libp2p.Host var bs blockstore.Blockstore network := gsnet.NewFromLibp2pHost(host) -loader := storeutil.LoaderForBlockstore(bs) -storer := storeutil.StorerForBlockstore(bs) +lsys := storeutil.LinkSystemForBlockstore(bs) -exchange := graphsync.New(ctx, network, loader, storer) -``` - -### Write A Loader For An IPFS BlockStore - -If you are using a traditional go-ipfs-blockstore, your link loading function looks like this: - -```golang -type BlockStore interface { - Get(lnk cid.Cid) (blocks.Block, error) -} -``` - -or, more generally: - -```golang -type Cid2BlockFn func (lnk cid.Cid) (blocks.Block, error) -``` - -in `go-ipld-prime`, the signature for a link loader is as follows: - -```golang -type Loader func(lnk Link, lnkCtx LinkContext) (io.Reader, error) -``` - -`go-ipld-prime` intentionally keeps its interfaces as abstract as possible to limit dependencies on other ipfs/filecoin specific packages. An IPLD Link is an abstraction for a CID, and IPLD expects io.Reader's rather than an actual block. IPLD provides a `cidLink` package for working with Links that use CIDs as the underlying data, and it's safe to assume that's the type in use if your code deals only with CIDs. A conversion would look something like this: - -```golang -import ( - ipld "github.com/ipld/go-ipld-prime" - cidLink "github.com/ipld/go-ipld-prime/linking/cid" -) - -func LoaderFromCid2BlockFn(cid2BlockFn Cid2BlockFn) ipld.Loader { - return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { - asCidLink, ok := lnk.(cidlink.Link) - if !ok { - return nil, fmt.Errorf("Unsupported Link Type") - } - block, err := cid2BlockFn(asCidLink.Cid) - if err != nil { - return nil, err - } - return bytes.NewReader(block.RawData()), nil - } -} -``` - -### Write A Storer From An IPFS BlockStore - -If you are using a traditional go-ipfs-blockstore, your storage function looks like this: - -```golang -type BlockStore interface { - Put(blocks.Block) error -} -``` - -or, more generally: - -```golang -type BlockStoreFn func (blocks.Block) (error) -``` - -in `go-ipld-prime`, the signature for a link storer is a bit different: - -```golang -type StoreCommitter func(Link) error -type Storer func(lnkCtx LinkContext) (io.Writer, StoreCommitter, error) -``` - -`go-ipld-prime` stores in two parts to support streaming -- the storer is called and returns an IO.Writer and a function to commit changes when finished. Here's how you can write a storer from a traditional block storing signature. - -```golang -import ( - blocks "github.com/ipfs/go-block-format" - ipld "github.com/ipld/go-ipld-prime" - cidLink "github.com/ipld/go-ipld-prime/linking/cid" -) - -func StorerFromBlockStoreFn(blockStoreFn BlockStoreFn) ipld.Storer { - return func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { - var buffer bytes.Buffer - committer := func(lnk ipld.Link) error { - asCidLink, ok := lnk.(cidlink.Link) - if !ok { - return fmt.Errorf("Unsupported Link Type") - } - block := blocks.NewBlockWithCid(buffer.Bytes(), asCidLink.Cid) - return blockStoreFn(block) - } - return &buffer, committer, nil - } -} +exchange := graphsync.New(ctx, network, lsys) ``` ### Calling Graphsync diff --git a/docs/architecture.md b/docs/architecture.md index c83d3188..9bd07dd8 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -11,6 +11,7 @@ This document explains the basic architecture for the go implementation of the G - [Requestor Implementation](#requestor-implementation) - [Responder Implementation](#responder-implementation) - [Message Sending Layer](#message-sending-layer) +- [Miscellaneous](#miscellaneous) ## Overview @@ -28,7 +29,7 @@ go-graphsync also depends on the following external dependencies: 1. A network implementation, which provides basic functions for sending and receiving messages on a network. -2. A local blockstore implementation, expressed by a `loader` function and a `storer` function. +2. A local blockstore implementation, expressed by an IPLD `LinkSystem`. ## Request Lifecycle @@ -47,13 +48,13 @@ This order of these requirements corresponds roughly with the sequence they're e However, if you reverse the order of these requirements, it becomes clear that a GraphSync request is really an IPLD Selector Query performed locally that happens to be backed by another remote peer performing the same query on its machine and feeding the results to the requestor. -Selector queries, as implemented in the `go-ipld-prime` library, rely on a loader function to load data any time a link boundary is crossed during a query. The loader can be configured each time a selector query is performed. We use this to support network communication on both sides of a GraphSync query. +Selector queries, as implemented in the `go-ipld-prime` library, rely on a function to load data any time a link boundary is crossed during a query. The loader can be configured each time a selector query is performed. We use this to support network communication on both sides of a GraphSync query. -On the requestor side, instead of supplying the local storage loader, we supply it with a different loader that waits for responses from the network -- and also simultaneously stores them in local storage as they are loaded. Blocks that come back on the network that are never loaded as part of the local Selector traversal are simply dropped. Moreover, we can take advantage of the fact that blocks get stored locally as they are traversed to limit network traffic -- there's no need to send back a block twice because we can safely assume in a single query, once a block is traversed once, it's in the requestors local storage. +On the requestor side, instead of supplying a function to read from local storage, we supply a function that waits for responses from the network -- and also simultaneously stores them in local storage as they are loaded. Blocks that come back on the network that are never loaded as part of the local Selector traversal are simply dropped. Moreover, we can take advantage of the fact that blocks get stored locally as they are traversed to limit network traffic -- there's no need to send back a block twice because we can safely assume in a single query, once a block is traversed once, it's in the requestors local storage. -On the responder side, we employ a similar method -- while an IPLD Selector query operates at the finer grain of traversing IPLD Nodes, what we really care about is when it crosses a link boundary. At this point, IPLD asks the Loader to load the link, and here, we provide IPLD with a loader that wraps the local storage loader but also transmits every block loaded across the network. +On the responder side, we employ a similar method -- while an IPLD Selector query operates at the finer grain of traversing IPLD Nodes, what we really care about is when it crosses a link boundary. At this point, IPLD calls out to a function to load the link, and here, we provide IPLD with a function that loads from local storage but also transmits every block loaded across the network. -So, effectively what we are doing is using intercepted loaders on both sides to handle the transmitting and receiving of data across the network. +So, effectively what we are doing is using intercepted block loaders on both sides to handle the transmitting and receiving of data across the network. While the actual code operates in a way that is slightly more complicated, the basic sequence of a single GraphSync request is as follows: @@ -71,9 +72,8 @@ Having outlined all the steps to execute a single roundtrip Graphsync request, t To do this, GraphSync maintains several independent threads of execution (i.e. goroutines). Specifically: - On the requestor side: 1. We maintain an independent thread to make and track requests (RequestManager) -2. We maintain an independent thread to feed incoming blocks to selector verifications (AsyncLoader) -3. Each outgoing request has an independent thread performing selector verification -4. Each outgoing request has an independent thread collecting and buffering final responses before they are returned to the caller. Graphsync returns responses to the caller through a channel. If the caller fails to immediately read the response channel, this should not block other requests from being processed. +2. Each outgoing request has an independent thread performing selector verification +3. Each outgoing request has an independent thread collecting and buffering final responses before they are returned to the caller. Graphsync returns responses to the caller through a channel. If the caller fails to immediately read the response channel, this should not block other requests from being processed. - On the responder side: 1. We maintain an independent thread to receive incoming requests and track outgoing responses. As each incoming request is received, it's put into a prioritized queue. 2. We maintain fixed number of threads that continuously pull the highest priority request from the queue and perform the selector query for that request. We marshal and deduplicate outgoing responses and blocks before they are sent back. This minimizes data sent on the wire and allows queries to proceed without getting blocked by the network. @@ -93,7 +93,7 @@ The network implementation needs to provide basic lower level utilities for send ### Local Blockstore Implementation -Interacting with a local blockstore is expressed by a `loader` function and a `storer` function. The `loader` function takes an IPLD Link and returns an `io.Reader` for corresponding block data, while the `storer` takes a Link and returns a `io.Writer` to write corresponding block data, plus a commit function to call when the data is ready to transfer to permanent storage. +Interacting with a local blockstore is expressed via an IPLD `LinkSystem`. The block loading function in an IPLD `LinkSystem` takes an IPLD Link and returns an `io.Reader` for corresponding block data, while the block storing function takes a Link and returns a `io.Writer` to write corresponding block data, plus a commit function to call when the data is ready to transfer to permanent storage. ## Requestor Implementation @@ -172,9 +172,25 @@ The message consists of a PeerManager which tracks peers, and a message queue fo The message queue system contains a mechanism for applying backpressure to a query execution to make sure that a slow network connection doesn't cause us to load all the blocks for the query into memory while we wait for messages to go over the network. Whenever you attempt to queue data into the message queue, you provide an estimated size for the data that will be held in memory till the message goes out. Internally, the message queue uses the Allocator to track memory usage, and the call to queue data will block if there is too much data buffered in memory. When messages are sent out, memory is released, which will unblock requests to queue data for the message queue. -## Hooks And Listeners +## Miscellaneous + +### Hooks And Listeners go-graphsync provides a variety of points in the request/response lifecycle where one can provide a hook to inspect the current state of the request/response and potentially take action. These hooks provide the core mechanisms for authenticating requests, processing graphsync extensions, pausing and resuming, and generally enabling a higher level consumer of the graphsync to precisely control the request/response lifecycle. Graphsync also provides listeners that enable a caller to be notified when various asynchronous events happen in the request response lifecycle. Currently graphsync contains an internal pubsub notification system (see [notifications](../notifications)) to escalate low level asynchonous events back to high level modules that pass them to external listeners. A future refactor might look for a way to remove this notification system as it adds additional complexity. +### Actor Pattern In RequestManager And ResponseManager + +To manage concurrency in a predictable way, the RequestManager and the ResponseManager are informally implemented using the [Actor model](https://en.wikipedia.org/wiki/Actor_model) employed in distributed systems languages like Erlang. + +Each has isolated, internal state and a semi-asynchronous message queue (just a go channel with a 16 message buffer). The internal thread takes messages off the queue and dispatches them to call methods that modify internal state. + +Each implementation is spread out across three files: +- client.go - the public interface whose methods dispatch messages to the internal thread +- server.go - the methods run inside the thread that actually process messages and modify internal state +- messages.go - the differnt messages that are sent through the main message box + +To achieve the kind of dynamic dispatch one expects from the actor pattern based on message type, we use the visitor pattern to simulate sum types. (https://making.pusher.com/alternatives-to-sum-types-in-go/) This does mean the implementation is a bit verbose to say the least. + +However, implementing actors provides a more predictable way to handle concurrency issues than traditional select statements and helps make the logic of complex classes like the RequestManager and ResponseManager easier to follow. \ No newline at end of file diff --git a/docs/processes.png b/docs/processes.png index 3bfaa2f6..964dffd3 100644 Binary files a/docs/processes.png and b/docs/processes.png differ diff --git a/docs/processes.puml b/docs/processes.puml index 19f01fec..953a80cd 100644 --- a/docs/processes.puml +++ b/docs/processes.puml @@ -13,7 +13,6 @@ if (operation type) then (outgoing request or incoming response) partition "Graphsync Requestor Implementation" { :RequestManager; if (operation type) then (incoming response) -:AsyncLoader; partition "Verifying Queries" { fork :ipld.Traverse; diff --git a/go.mod b/go.mod index 060031c5..c36a8900 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ipfs/go-graphsync -go 1.12 +go 1.13 require ( github.com/gogo/protobuf v1.3.2 diff --git a/impl/graphsync.go b/impl/graphsync.go index cb9dae69..43cdb12b 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -196,7 +196,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, requestAllocator: requestAllocator, } - asyncLoader.Startup() requestManager.SetDelegate(peerManager) requestManager.Startup() responseManager.Startup() @@ -206,7 +205,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, // Request initiates a new GraphSync request to the given peer using the given selector spec. func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { - return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...) + return gs.requestManager.NewRequest(ctx, p, root, selector, extensions...) } // RegisterIncomingRequestHook adds a hook that runs when a request is received diff --git a/message/message.go b/message/message.go index 8d269ef1..564be890 100644 --- a/message/message.go +++ b/message/message.go @@ -21,26 +21,23 @@ import ( // IsTerminalSuccessCode returns true if the response code indicates the // request terminated successfully. +// DEPRECATED: use status.IsSuccess() func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool { - return status == graphsync.RequestCompletedFull || - status == graphsync.RequestCompletedPartial + return status.IsSuccess() } // IsTerminalFailureCode returns true if the response code indicates the // request terminated in failure. +// DEPRECATED: use status.IsFailure() func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool { - return status == graphsync.RequestFailedBusy || - status == graphsync.RequestFailedContentNotFound || - status == graphsync.RequestFailedLegal || - status == graphsync.RequestFailedUnknown || - status == graphsync.RequestCancelled || - status == graphsync.RequestRejected + return status.IsFailure() } // IsTerminalResponseCode returns true if the response code signals // the end of the request +// DEPRECATED: use status.IsTerminal() func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool { - return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status) + return status.IsTerminal() } // Exportable is an interface that can serialize to a protobuf diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index bcbccba9..0bf02a8e 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -3,7 +3,9 @@ package asyncloader import ( "context" "errors" + "fmt" "io/ioutil" + "sync" blocks "github.com/ipfs/go-block-format" logging "github.com/ipfs/go-log/v2" @@ -20,10 +22,6 @@ import ( var log = logging.Logger("gs-asyncloader") -type loaderMessage interface { - handle(al *AsyncLoader) -} - type alternateQueue struct { responseCache *responsecache.ResponseCache loadAttemptQueue *loadattemptqueue.LoadAttemptQueue @@ -37,18 +35,17 @@ type Allocator interface { // AsyncLoader manages loading links asynchronously in as new responses // come in from the network type AsyncLoader struct { - ctx context.Context - cancel context.CancelFunc - incomingMessages chan loaderMessage - outgoingMessages chan loaderMessage - - defaultLinkSystem ipld.LinkSystem - activeRequests map[graphsync.RequestID]struct{} - requestQueues map[graphsync.RequestID]string - alternateQueues map[string]alternateQueue - responseCache *responsecache.ResponseCache - loadAttemptQueue *loadattemptqueue.LoadAttemptQueue - allocator Allocator + ctx context.Context + cancel context.CancelFunc + allocator Allocator + + // this mutex protects access to the state of the async loader, which covers all data fields below below + stateLk sync.Mutex + activeRequests map[graphsync.RequestID]struct{} + requestQueues map[graphsync.RequestID]string + alternateQueues map[string]alternateQueue + responseCache *responsecache.ResponseCache + loadAttemptQueue *loadattemptqueue.LoadAttemptQueue } // New initializes a new link loading manager for asynchronous loads from the given context @@ -57,66 +54,83 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) * responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator) ctx, cancel := context.WithCancel(ctx) return &AsyncLoader{ - ctx: ctx, - cancel: cancel, - incomingMessages: make(chan loaderMessage), - outgoingMessages: make(chan loaderMessage), - defaultLinkSystem: linkSystem, - activeRequests: make(map[graphsync.RequestID]struct{}), - requestQueues: make(map[graphsync.RequestID]string), - alternateQueues: make(map[string]alternateQueue), - responseCache: responseCache, - loadAttemptQueue: loadAttemptQueue, - allocator: allocator, + ctx: ctx, + cancel: cancel, + activeRequests: make(map[graphsync.RequestID]struct{}), + requestQueues: make(map[graphsync.RequestID]string), + alternateQueues: make(map[string]alternateQueue), + responseCache: responseCache, + loadAttemptQueue: loadAttemptQueue, + allocator: allocator, } } -// Startup starts processing of messages -func (al *AsyncLoader) Startup() { - go al.messageQueueWorker() - go al.run() -} - -// Shutdown finishes processing of messages -func (al *AsyncLoader) Shutdown() { - al.cancel() -} - // RegisterPersistenceOption registers a new loader/storer option for processing requests func (al *AsyncLoader) RegisterPersistenceOption(name string, lsys ipld.LinkSystem) error { - if name == "" { - return errors.New("persistence option must have a name") + al.stateLk.Lock() + defer al.stateLk.Unlock() + _, existing := al.alternateQueues[name] + if existing { + return errors.New("already registerd a persistence option with this name") } - response := make(chan error, 1) - err := al.sendSyncMessage(®isterPersistenceOptionMessage{name, lsys, response}, response) - return err + responseCache, loadAttemptQueue := setupAttemptQueue(lsys, al.allocator) + al.alternateQueues[name] = alternateQueue{responseCache, loadAttemptQueue} + return nil } // UnregisterPersistenceOption unregisters an existing loader/storer option for processing requests func (al *AsyncLoader) UnregisterPersistenceOption(name string) error { - if name == "" { - return errors.New("persistence option must have a name") + al.stateLk.Lock() + defer al.stateLk.Unlock() + _, ok := al.alternateQueues[name] + if !ok { + return fmt.Errorf("unknown persistence option: %s", name) + } + for _, requestQueue := range al.requestQueues { + if name == requestQueue { + return errors.New("cannot unregister while requests are in progress") + } } - response := make(chan error, 1) - err := al.sendSyncMessage(&unregisterPersistenceOptionMessage{name, response}, response) - return err + delete(al.alternateQueues, name) + return nil } // StartRequest indicates the given request has started and the manager should // continually attempt to load links for this request as new responses come in func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error { - response := make(chan error, 1) - err := al.sendSyncMessage(&startRequestMessage{requestID, persistenceOption, response}, response) - return err + al.stateLk.Lock() + defer al.stateLk.Unlock() + if persistenceOption != "" { + _, ok := al.alternateQueues[persistenceOption] + if !ok { + return errors.New("unknown persistence option") + } + al.requestQueues[requestID] = persistenceOption + } + al.activeRequests[requestID] = struct{}{} + return nil } // ProcessResponse injests new responses and completes asynchronous loads as // neccesary func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { - select { - case <-al.ctx.Done(): - case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}: + al.stateLk.Lock() + defer al.stateLk.Unlock() + byQueue := make(map[string][]graphsync.RequestID) + for requestID := range responses { + queue := al.requestQueues[requestID] + byQueue[queue] = append(byQueue[queue], requestID) + } + for queue, requestIDs := range byQueue { + loadAttemptQueue := al.getLoadAttemptQueue(queue) + responseCache := al.getResponseCache(queue) + queueResponses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs)) + for _, requestID := range requestIDs { + queueResponses[requestID] = responses[requestID] + } + responseCache.ProcessResponse(queueResponses, blks) + loadAttemptQueue.RetryLoads() } } @@ -124,9 +138,12 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat // for errors -- only one message will be sent over either. func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult { resultChan := make(chan types.AsyncLoadResult, 1) - response := make(chan error, 1) lr := loadattemptqueue.NewLoadRequest(p, requestID, link, linkContext, resultChan) - _ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response) + al.stateLk.Lock() + defer al.stateLk.Unlock() + _, retry := al.activeRequests[requestID] + loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[requestID]) + loadAttemptQueue.AttemptLoad(lr, retry) return resultChan } @@ -134,107 +151,26 @@ func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link // requestID, so if no responses are in the cache or local store, a link load // should not retry func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) { - select { - case <-al.ctx.Done(): - case al.incomingMessages <- &finishRequestMessage{requestID}: - } + al.stateLk.Lock() + defer al.stateLk.Unlock() + delete(al.activeRequests, requestID) + loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[requestID]) + loadAttemptQueue.ClearRequest(requestID) } // CleanupRequest indicates the given request is complete on the client side, // and no further attempts will be made to load links for this request, // so any cached response data is invalid can be cleaned func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) { - select { - case <-al.ctx.Done(): - case al.incomingMessages <- &cleanupRequestMessage{requestID}: - } -} - -func (al *AsyncLoader) sendSyncMessage(message loaderMessage, response chan error) error { - select { - case <-al.ctx.Done(): - return errors.New("context closed") - case al.incomingMessages <- message: - } - select { - case <-al.ctx.Done(): - return errors.New("context closed") - case err := <-response: - return err - } -} - -type loadRequestMessage struct { - response chan error - requestID graphsync.RequestID - loadRequest loadattemptqueue.LoadRequest -} - -type newResponsesAvailableMessage struct { - responses map[graphsync.RequestID]metadata.Metadata - blks []blocks.Block -} - -type registerPersistenceOptionMessage struct { - name string - linkSystem ipld.LinkSystem - response chan error -} - -type unregisterPersistenceOptionMessage struct { - name string - response chan error -} - -type startRequestMessage struct { - requestID graphsync.RequestID - persistenceOption string - response chan error -} - -type finishRequestMessage struct { - requestID graphsync.RequestID -} - -type cleanupRequestMessage struct { - requestID graphsync.RequestID -} - -func (al *AsyncLoader) run() { - for { - select { - case <-al.ctx.Done(): - return - case message := <-al.outgoingMessages: - message.handle(al) - } - } -} - -func (al *AsyncLoader) messageQueueWorker() { - var messageBuffer []loaderMessage - nextMessage := func() loaderMessage { - if len(messageBuffer) == 0 { - return nil - } - return messageBuffer[0] - } - outgoingMessages := func() chan<- loaderMessage { - if len(messageBuffer) == 0 { - return nil - } - return al.outgoingMessages - } - for { - select { - case incomingMessage := <-al.incomingMessages: - messageBuffer = append(messageBuffer, incomingMessage) - case outgoingMessages() <- nextMessage(): - messageBuffer = messageBuffer[1:] - case <-al.ctx.Done(): - return - } + al.stateLk.Lock() + defer al.stateLk.Unlock() + aq, ok := al.requestQueues[requestID] + if ok { + al.alternateQueues[aq].responseCache.FinishRequest(requestID) + delete(al.requestQueues, requestID) + return } + al.responseCache.FinishRequest(requestID) } func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue { @@ -251,110 +187,6 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac return al.alternateQueues[queue].responseCache } -func (lrm *loadRequestMessage) handle(al *AsyncLoader) { - _, retry := al.activeRequests[lrm.requestID] - loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID]) - loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry) - select { - case <-al.ctx.Done(): - case lrm.response <- nil: - } -} - -func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error { - _, existing := al.alternateQueues[rpom.name] - if existing { - return errors.New("already registerd a persistence option with this name") - } - responseCache, loadAttemptQueue := setupAttemptQueue(rpom.linkSystem, al.allocator) - al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue} - return nil -} - -func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) { - err := rpom.register(al) - select { - case <-al.ctx.Done(): - case rpom.response <- err: - } -} - -func (upom *unregisterPersistenceOptionMessage) unregister(al *AsyncLoader) error { - _, ok := al.alternateQueues[upom.name] - if !ok { - return errors.New("unknown persistence option") - } - for _, requestQueue := range al.requestQueues { - if upom.name == requestQueue { - return errors.New("cannot unregister while requests are in progress") - } - } - delete(al.alternateQueues, upom.name) - return nil -} - -func (upom *unregisterPersistenceOptionMessage) handle(al *AsyncLoader) { - err := upom.unregister(al) - select { - case <-al.ctx.Done(): - case upom.response <- err: - } -} - -func (srm *startRequestMessage) startRequest(al *AsyncLoader) error { - if srm.persistenceOption != "" { - _, ok := al.alternateQueues[srm.persistenceOption] - if !ok { - return errors.New("unknown persistence option") - } - al.requestQueues[srm.requestID] = srm.persistenceOption - } - al.activeRequests[srm.requestID] = struct{}{} - return nil -} - -func (srm *startRequestMessage) handle(al *AsyncLoader) { - err := srm.startRequest(al) - select { - case <-al.ctx.Done(): - case srm.response <- err: - } -} - -func (frm *finishRequestMessage) handle(al *AsyncLoader) { - delete(al.activeRequests, frm.requestID) - loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID]) - loadAttemptQueue.ClearRequest(frm.requestID) -} - -func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) { - byQueue := make(map[string][]graphsync.RequestID) - for requestID := range nram.responses { - queue := al.requestQueues[requestID] - byQueue[queue] = append(byQueue[queue], requestID) - } - for queue, requestIDs := range byQueue { - loadAttemptQueue := al.getLoadAttemptQueue(queue) - responseCache := al.getResponseCache(queue) - responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs)) - for _, requestID := range requestIDs { - responses[requestID] = nram.responses[requestID] - } - responseCache.ProcessResponse(responses, nram.blks) - loadAttemptQueue.RetryLoads() - } -} - -func (crm *cleanupRequestMessage) handle(al *AsyncLoader) { - aq, ok := al.requestQueues[crm.requestID] - if ok { - al.alternateQueues[aq].responseCache.FinishRequest(crm.requestID) - delete(al.requestQueues, crm.requestID) - return - } - al.responseCache.FinishRequest(crm.requestID) -} - func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener) diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 54a76903..5349a805 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -387,7 +387,6 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad defer cancel() allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20)) asyncLoader := New(ctx, st.lsys, allocator) - asyncLoader.Startup() exec(ctx, asyncLoader) } diff --git a/requestmanager/client.go b/requestmanager/client.go new file mode 100644 index 00000000..43181f36 --- /dev/null +++ b/requestmanager/client.go @@ -0,0 +1,359 @@ +package requestmanager + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/hannahhoward/go-pubsub" + blocks "github.com/ipfs/go-block-format" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/listeners" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/messagequeue" + "github.com/ipfs/go-graphsync/metadata" + "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/requestmanager/hooks" + "github.com/ipfs/go-graphsync/requestmanager/types" +) + +// The code in this file implements the public interface of the request manager. +// Functions in this file operate outside the internal thread and should +// NOT modify the internal state of the RequestManager. + +var log = logging.Logger("graphsync") + +const ( + // defaultPriority is the default priority for requests sent by graphsync + defaultPriority = graphsync.Priority(0) +) + +type inProgressRequestStatus struct { + ctx context.Context + startTime time.Time + cancelFn func() + p peer.ID + terminalError chan error + resumeMessages chan []graphsync.ExtensionData + pauseMessages chan struct{} + paused bool + lastResponse atomic.Value + onTerminated []chan<- error +} + +// PeerHandler is an interface that can send requests to peers +type PeerHandler interface { + AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) +} + +// AsyncLoader is an interface for loading links asynchronously, returning +// results as new responses are processed +type AsyncLoader interface { + StartRequest(graphsync.RequestID, string) error + ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, + blks []blocks.Block) + AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult + CompleteResponsesFor(requestID graphsync.RequestID) + CleanupRequest(requestID graphsync.RequestID) +} + +// RequestManager tracks outgoing requests and processes incoming reponses +// to them. +type RequestManager struct { + ctx context.Context + cancel func() + messages chan requestManagerMessage + peerHandler PeerHandler + rc *responseCollector + asyncLoader AsyncLoader + disconnectNotif *pubsub.PubSub + linkSystem ipld.LinkSystem + + // dont touch out side of run loop + nextRequestID graphsync.RequestID + inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus + requestHooks RequestHooks + responseHooks ResponseHooks + blockHooks BlockHooks + networkErrorListeners *listeners.NetworkErrorListeners +} + +type requestManagerMessage interface { + handle(rm *RequestManager) +} + +// RequestHooks run for new requests +type RequestHooks interface { + ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult +} + +// ResponseHooks run for new responses +type ResponseHooks interface { + ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult +} + +// BlockHooks run for each block loaded +type BlockHooks interface { + ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult +} + +// New generates a new request manager from a context, network, and selectorQuerier +func New(ctx context.Context, + asyncLoader AsyncLoader, + linkSystem ipld.LinkSystem, + requestHooks RequestHooks, + responseHooks ResponseHooks, + blockHooks BlockHooks, + networkErrorListeners *listeners.NetworkErrorListeners, +) *RequestManager { + ctx, cancel := context.WithCancel(ctx) + return &RequestManager{ + ctx: ctx, + cancel: cancel, + asyncLoader: asyncLoader, + disconnectNotif: pubsub.New(disconnectDispatcher), + linkSystem: linkSystem, + rc: newResponseCollector(ctx), + messages: make(chan requestManagerMessage, 16), + inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus), + requestHooks: requestHooks, + responseHooks: responseHooks, + blockHooks: blockHooks, + networkErrorListeners: networkErrorListeners, + } +} + +// SetDelegate specifies who will send messages out to the internet. +func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) { + rm.peerHandler = peerHandler +} + +type inProgressRequest struct { + requestID graphsync.RequestID + request gsmsg.GraphSyncRequest + incoming chan graphsync.ResponseProgress + incomingError chan error +} + +// NewRequest initiates a new GraphSync request to the given peer. +func (rm *RequestManager) NewRequest(ctx context.Context, + p peer.ID, + root ipld.Link, + selectorNode ipld.Node, + extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { + if _, err := selector.ParseSelector(selectorNode); err != nil { + return rm.singleErrorResponse(fmt.Errorf("invalid selector spec")) + } + + inProgressRequestChan := make(chan inProgressRequest) + + rm.send(&newRequestMessage{p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done()) + var receivedInProgressRequest inProgressRequest + select { + case <-rm.ctx.Done(): + return rm.emptyResponse() + case receivedInProgressRequest = <-inProgressRequestChan: + } + + // If the connection to the peer is disconnected, fire an error + unsub := rm.listenForDisconnect(p, func(neterr error) { + rm.networkErrorListeners.NotifyNetworkErrorListeners(p, receivedInProgressRequest.request, neterr) + }) + + return rm.rc.collectResponses(ctx, + receivedInProgressRequest.incoming, + receivedInProgressRequest.incomingError, + func() { + rm.cancelRequestAndClose(receivedInProgressRequest.requestID, + receivedInProgressRequest.incoming, + receivedInProgressRequest.incomingError) + }, + // Once the request has completed, stop listening for disconnect events + unsub, + ) +} + +// Dispatch the Disconnect event to subscribers +func disconnectDispatcher(p pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + listener := subscriberFn.(func(peer.ID)) + listener(p.(peer.ID)) + return nil +} + +// Listen for the Disconnect event for the given peer +func (rm *RequestManager) listenForDisconnect(p peer.ID, onDisconnect func(neterr error)) func() { + // Subscribe to Disconnect notifications + return rm.disconnectNotif.Subscribe(func(evtPeer peer.ID) { + // If the peer is the one we're interested in, call the listener + if evtPeer == p { + onDisconnect(fmt.Errorf("disconnected from peer %s", p)) + } + }) +} + +// Disconnected is called when a peer disconnects +func (rm *RequestManager) Disconnected(p peer.ID) { + // Notify any listeners that a peer has disconnected + _ = rm.disconnectNotif.Publish(p) +} + +func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) { + ch := make(chan graphsync.ResponseProgress) + close(ch) + errCh := make(chan error) + close(errCh) + return ch, errCh +} + +func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) { + ch := make(chan graphsync.ResponseProgress) + close(ch) + errCh := make(chan error, 1) + errCh <- err + close(errCh) + return ch, errCh +} + +func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID, + incomingResponses chan graphsync.ResponseProgress, + incomingErrors chan error) { + cancelMessageChannel := rm.messages + for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil { + select { + case cancelMessageChannel <- &cancelRequestMessage{requestID, false, nil, nil}: + cancelMessageChannel = nil + // clear out any remaining responses, in case and "incoming reponse" + // messages get processed before our cancel message + case _, ok := <-incomingResponses: + if !ok { + incomingResponses = nil + } + case _, ok := <-incomingErrors: + if !ok { + incomingErrors = nil + } + case <-rm.ctx.Done(): + return + } + } +} + +// CancelRequest cancels the given request ID and waits for the request to terminate +func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { + terminated := make(chan error, 1) + rm.send(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done()) + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case err := <-terminated: + return err + } +} + +// ProcessResponses ingests the given responses from the network and +// and updates the in progress requests based on those responses. +func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, + blks []blocks.Block) { + rm.send(&processResponseMessage{p, responses, blks}, nil) +} + +// UnpauseRequest unpauses a request that was paused in a block hook based request ID +// Can also send extensions with unpause +func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { + response := make(chan error, 1) + rm.send(&unpauseRequestMessage{requestID, extensions, response}, nil) + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case err := <-response: + return err + } +} + +// PauseRequest pauses an in progress request (may take 1 or more blocks to process) +func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error { + response := make(chan error, 1) + rm.send(&pauseRequestMessage{requestID, response}, nil) + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case err := <-response: + return err + } +} + +// ProcessBlockHooks processes block hooks for the given response & block and cancels +// the request as needed +func (rm *RequestManager) ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error { + result := rm.blockHooks.ProcessBlockHooks(p, response, block) + if len(result.Extensions) > 0 { + updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) + rm.SendRequest(p, updateRequest) + } + if result.Err != nil { + _, isPause := result.Err.(hooks.ErrPaused) + rm.send(&cancelRequestMessage{response.RequestID(), isPause, nil, nil}, nil) + } + return result.Err +} + +// TerminateRequest marks a request done +func (rm *RequestManager) TerminateRequest(requestID graphsync.RequestID) { + rm.send(&terminateRequestMessage{requestID}, nil) +} + +// SendRequest sends a request to the message queue +func (rm *RequestManager) SendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { + sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}) + failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub} + rm.peerHandler.AllocateAndBuildMessage(p, 0, func(builder *gsmsg.Builder) { + builder.AddRequest(request) + }, []notifications.Notifee{failNotifee}) +} + +// Startup starts processing for the WantManager. +func (rm *RequestManager) Startup() { + go rm.run() +} + +// Shutdown ends processing for the want manager. +func (rm *RequestManager) Shutdown() { + rm.cancel() +} + +func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) { + select { + case <-rm.ctx.Done(): + case <-done: + case rm.messages <- message: + } +} + +type reqSubscriber struct { + p peer.ID + request gsmsg.GraphSyncRequest + networkErrorListeners *listeners.NetworkErrorListeners +} + +func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + mqEvt, isMQEvt := event.(messagequeue.Event) + if !isMQEvt || mqEvt.Name != messagequeue.Error { + return + } + + r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err) + //r.re.networkError <- mqEvt.Err + //r.re.terminateRequest() +} + +func (r reqSubscriber) OnClose(topic notifications.Topic) { +} + +const requestNetworkError = "request_network_error" diff --git a/requestmanager/messages.go b/requestmanager/messages.go new file mode 100644 index 00000000..b18efe24 --- /dev/null +++ b/requestmanager/messages.go @@ -0,0 +1,85 @@ +package requestmanager + +import ( + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-graphsync" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" +) + +type pauseRequestMessage struct { + id graphsync.RequestID + response chan<- error +} + +func (prm *pauseRequestMessage) handle(rm *RequestManager) { + err := rm.pause(prm.id) + select { + case <-rm.ctx.Done(): + case prm.response <- err: + } +} + +type unpauseRequestMessage struct { + id graphsync.RequestID + extensions []graphsync.ExtensionData + response chan<- error +} + +func (urm *unpauseRequestMessage) handle(rm *RequestManager) { + err := rm.unpause(urm.id, urm.extensions) + select { + case <-rm.ctx.Done(): + case urm.response <- err: + } +} + +type processResponseMessage struct { + p peer.ID + responses []gsmsg.GraphSyncResponse + blks []blocks.Block +} + +func (prm *processResponseMessage) handle(rm *RequestManager) { + rm.processResponseMessage(prm.p, prm.responses, prm.blks) +} + +type cancelRequestMessage struct { + requestID graphsync.RequestID + isPause bool + onTerminated chan error + terminalError error +} + +func (crm *cancelRequestMessage) handle(rm *RequestManager) { + rm.cancelRequest(crm.requestID, crm.isPause, crm.onTerminated, crm.terminalError) +} + +type terminateRequestMessage struct { + requestID graphsync.RequestID +} + +func (trm *terminateRequestMessage) handle(rm *RequestManager) { + rm.terminateRequest(trm.requestID) +} + +type newRequestMessage struct { + p peer.ID + root ipld.Link + selector ipld.Node + extensions []graphsync.ExtensionData + inProgressRequestChan chan<- inProgressRequest +} + +func (nrm *newRequestMessage) handle(rm *RequestManager) { + var ipr inProgressRequest + + ipr.request, ipr.incoming, ipr.incomingError = rm.setupRequest(nrm.p, nrm.root, nrm.selector, nrm.extensions) + ipr.requestID = ipr.request.ID() + + select { + case nrm.inProgressRequestChan <- ipr: + case <-rm.ctx.Done(): + } +} diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go deleted file mode 100644 index 8153e98e..00000000 --- a/requestmanager/requestmanager.go +++ /dev/null @@ -1,711 +0,0 @@ -package requestmanager - -import ( - "context" - "errors" - "fmt" - "sync/atomic" - "time" - - "github.com/hannahhoward/go-pubsub" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" - "github.com/ipfs/go-graphsync/dedupkey" - ipldutil "github.com/ipfs/go-graphsync/ipldutil" - "github.com/ipfs/go-graphsync/listeners" - gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/messagequeue" - "github.com/ipfs/go-graphsync/metadata" - "github.com/ipfs/go-graphsync/notifications" - "github.com/ipfs/go-graphsync/requestmanager/executor" - "github.com/ipfs/go-graphsync/requestmanager/hooks" - "github.com/ipfs/go-graphsync/requestmanager/types" -) - -var log = logging.Logger("graphsync") - -const ( - // defaultPriority is the default priority for requests sent by graphsync - defaultPriority = graphsync.Priority(0) -) - -type inProgressRequestStatus struct { - ctx context.Context - startTime time.Time - cancelFn func() - p peer.ID - terminalError chan error - resumeMessages chan []graphsync.ExtensionData - pauseMessages chan struct{} - paused bool - lastResponse atomic.Value - onTerminated []chan error -} - -// PeerHandler is an interface that can send requests to peers -type PeerHandler interface { - AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) -} - -// AsyncLoader is an interface for loading links asynchronously, returning -// results as new responses are processed -type AsyncLoader interface { - StartRequest(graphsync.RequestID, string) error - ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, - blks []blocks.Block) - AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult - CompleteResponsesFor(requestID graphsync.RequestID) - CleanupRequest(requestID graphsync.RequestID) -} - -// RequestManager tracks outgoing requests and processes incoming reponses -// to them. -type RequestManager struct { - ctx context.Context - cancel func() - messages chan requestManagerMessage - peerHandler PeerHandler - rc *responseCollector - asyncLoader AsyncLoader - disconnectNotif *pubsub.PubSub - linkSystem ipld.LinkSystem - - // dont touch out side of run loop - nextRequestID graphsync.RequestID - inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus - requestHooks RequestHooks - responseHooks ResponseHooks - blockHooks BlockHooks - networkErrorListeners *listeners.NetworkErrorListeners -} - -type requestManagerMessage interface { - handle(rm *RequestManager) -} - -// RequestHooks run for new requests -type RequestHooks interface { - ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult -} - -// ResponseHooks run for new responses -type ResponseHooks interface { - ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult -} - -// BlockHooks run for each block loaded -type BlockHooks interface { - ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult -} - -// New generates a new request manager from a context, network, and selectorQuerier -func New(ctx context.Context, - asyncLoader AsyncLoader, - linkSystem ipld.LinkSystem, - requestHooks RequestHooks, - responseHooks ResponseHooks, - blockHooks BlockHooks, - networkErrorListeners *listeners.NetworkErrorListeners, -) *RequestManager { - ctx, cancel := context.WithCancel(ctx) - return &RequestManager{ - ctx: ctx, - cancel: cancel, - asyncLoader: asyncLoader, - disconnectNotif: pubsub.New(disconnectDispatcher), - linkSystem: linkSystem, - rc: newResponseCollector(ctx), - messages: make(chan requestManagerMessage, 16), - inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus), - requestHooks: requestHooks, - responseHooks: responseHooks, - blockHooks: blockHooks, - networkErrorListeners: networkErrorListeners, - } -} - -// SetDelegate specifies who will send messages out to the internet. -func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) { - rm.peerHandler = peerHandler -} - -type inProgressRequest struct { - requestID graphsync.RequestID - request gsmsg.GraphSyncRequest - incoming chan graphsync.ResponseProgress - incomingError chan error -} - -type newRequestMessage struct { - p peer.ID - root ipld.Link - selector ipld.Node - extensions []graphsync.ExtensionData - inProgressRequestChan chan<- inProgressRequest -} - -// SendRequest initiates a new GraphSync request to the given peer. -func (rm *RequestManager) SendRequest(ctx context.Context, - p peer.ID, - root ipld.Link, - selectorNode ipld.Node, - extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { - if _, err := selector.ParseSelector(selectorNode); err != nil { - return rm.singleErrorResponse(fmt.Errorf("invalid selector spec")) - } - - inProgressRequestChan := make(chan inProgressRequest) - - select { - case rm.messages <- &newRequestMessage{p, root, selectorNode, extensions, inProgressRequestChan}: - case <-rm.ctx.Done(): - return rm.emptyResponse() - case <-ctx.Done(): - return rm.emptyResponse() - } - var receivedInProgressRequest inProgressRequest - select { - case <-rm.ctx.Done(): - return rm.emptyResponse() - case receivedInProgressRequest = <-inProgressRequestChan: - } - - // If the connection to the peer is disconnected, fire an error - unsub := rm.listenForDisconnect(p, func(neterr error) { - rm.networkErrorListeners.NotifyNetworkErrorListeners(p, receivedInProgressRequest.request, neterr) - }) - - return rm.rc.collectResponses(ctx, - receivedInProgressRequest.incoming, - receivedInProgressRequest.incomingError, - func() { - rm.cancelRequest(receivedInProgressRequest.requestID, - receivedInProgressRequest.incoming, - receivedInProgressRequest.incomingError) - }, - // Once the request has completed, stop listening for disconnect events - unsub, - ) -} - -// Dispatch the Disconnect event to subscribers -func disconnectDispatcher(p pubsub.Event, subscriberFn pubsub.SubscriberFn) error { - listener := subscriberFn.(func(peer.ID)) - listener(p.(peer.ID)) - return nil -} - -// Listen for the Disconnect event for the given peer -func (rm *RequestManager) listenForDisconnect(p peer.ID, onDisconnect func(neterr error)) func() { - // Subscribe to Disconnect notifications - return rm.disconnectNotif.Subscribe(func(evtPeer peer.ID) { - // If the peer is the one we're interested in, call the listener - if evtPeer == p { - onDisconnect(fmt.Errorf("disconnected from peer %s", p)) - } - }) -} - -// Disconnected is called when a peer disconnects -func (rm *RequestManager) Disconnected(p peer.ID) { - // Notify any listeners that a peer has disconnected - _ = rm.disconnectNotif.Publish(p) -} - -func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) { - ch := make(chan graphsync.ResponseProgress) - close(ch) - errCh := make(chan error) - close(errCh) - return ch, errCh -} - -func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) { - ch := make(chan graphsync.ResponseProgress) - close(ch) - errCh := make(chan error, 1) - errCh <- err - close(errCh) - return ch, errCh -} - -type cancelRequestMessage struct { - requestID graphsync.RequestID - isPause bool - onTerminated chan error - terminalError error -} - -func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, - incomingResponses chan graphsync.ResponseProgress, - incomingErrors chan error) { - cancelMessageChannel := rm.messages - for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil { - select { - case cancelMessageChannel <- &cancelRequestMessage{requestID, false, nil, nil}: - cancelMessageChannel = nil - // clear out any remaining responses, in case and "incoming reponse" - // messages get processed before our cancel message - case _, ok := <-incomingResponses: - if !ok { - incomingResponses = nil - } - case _, ok := <-incomingErrors: - if !ok { - incomingErrors = nil - } - case <-rm.ctx.Done(): - return - } - } -} - -// CancelRequest cancels the given request ID and waits for the request to terminate -func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { - terminated := make(chan error, 1) - return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, terminated, ctx.Done()) -} - -type processResponseMessage struct { - p peer.ID - responses []gsmsg.GraphSyncResponse - blks []blocks.Block -} - -// ProcessResponses ingests the given responses from the network and -// and updates the in progress requests based on those responses. -func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, - blks []blocks.Block) { - select { - case rm.messages <- &processResponseMessage{p, responses, blks}: - case <-rm.ctx.Done(): - } -} - -type unpauseRequestMessage struct { - id graphsync.RequestID - extensions []graphsync.ExtensionData - response chan error -} - -// UnpauseRequest unpauses a request that was paused in a block hook based request ID -// Can also send extensions with unpause -func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - response := make(chan error, 1) - return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response, nil) -} - -type pauseRequestMessage struct { - id graphsync.RequestID - response chan error -} - -// PauseRequest pauses an in progress request (may take 1 or more blocks to process) -func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error { - response := make(chan error, 1) - return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response, nil) -} - -func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error, done <-chan struct{}) error { - select { - case <-rm.ctx.Done(): - return errors.New("context cancelled") - case <-done: - return errors.New("context cancelled") - case rm.messages <- message: - } - select { - case <-rm.ctx.Done(): - return errors.New("context cancelled") - case <-done: - return errors.New("context cancelled") - case err := <-response: - return err - } -} - -// Startup starts processing for the WantManager. -func (rm *RequestManager) Startup() { - go rm.run() -} - -// Shutdown ends processing for the want manager. -func (rm *RequestManager) Shutdown() { - rm.cancel() -} - -func (rm *RequestManager) run() { - // NOTE: Do not open any streams or connections from anywhere in this - // event loop. Really, just don't do anything likely to block. - defer rm.cleanupInProcessRequests() - - for { - select { - case message := <-rm.messages: - message.handle(rm) - case <-rm.ctx.Done(): - return - } - } -} - -func (rm *RequestManager) cleanupInProcessRequests() { - for _, requestStatus := range rm.inProgressRequestStatuses { - requestStatus.cancelFn() - } -} - -type terminateRequestMessage struct { - requestID graphsync.RequestID -} - -func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *RequestManager) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { - log.Infow("graphsync request initiated", "request id", requestID, "peer", nrm.p, "root", nrm.root) - - request, hooksResult, err := rm.validateRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions) - if err != nil { - rp, err := rm.singleErrorResponse(err) - return request, rp, err - } - doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) - var doNotSendCids *cid.Set - if has { - doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData) - if err != nil { - rp, err := rm.singleErrorResponse(err) - return request, rp, err - } - } else { - doNotSendCids = cid.NewSet() - } - ctx, cancel := context.WithCancel(rm.ctx) - p := nrm.p - resumeMessages := make(chan []graphsync.ExtensionData, 1) - pauseMessages := make(chan struct{}, 1) - terminalError := make(chan error, 1) - requestStatus := &inProgressRequestStatus{ - ctx: ctx, startTime: time.Now(), cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, terminalError: terminalError, - } - lastResponse := &requestStatus.lastResponse - lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) - rm.inProgressRequestStatuses[request.ID()] = requestStatus - incoming, incomingError := executor.ExecutionEnv{ - Ctx: rm.ctx, - SendRequest: rm.sendRequest, - TerminateRequest: rm.terminateRequest, - RunBlockHooks: rm.processBlockHooks, - Loader: rm.asyncLoader.AsyncLoad, - LinkSystem: rm.linkSystem, - }.Start( - executor.RequestExecution{ - Ctx: ctx, - P: p, - Request: request, - TerminalError: terminalError, - LastResponse: lastResponse, - DoNotSendCids: doNotSendCids, - NodePrototypeChooser: hooksResult.CustomChooser, - ResumeMessages: resumeMessages, - PauseMessages: pauseMessages, - }) - return request, incoming, incomingError -} - -func (nrm *newRequestMessage) handle(rm *RequestManager) { - var ipr inProgressRequest - ipr.requestID = rm.nextRequestID - rm.nextRequestID++ - ipr.request, ipr.incoming, ipr.incomingError = nrm.setupRequest(ipr.requestID, rm) - - select { - case nrm.inProgressRequestChan <- ipr: - case <-rm.ctx.Done(): - } -} - -func (trm *terminateRequestMessage) handle(rm *RequestManager) { - ipr, ok := rm.inProgressRequestStatuses[trm.requestID] - if ok { - log.Infow("graphsync request complete", "request id", trm.requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime)) - } - delete(rm.inProgressRequestStatuses, trm.requestID) - rm.asyncLoader.CleanupRequest(trm.requestID) - if ok { - for _, onTerminated := range ipr.onTerminated { - select { - case <-rm.ctx.Done(): - case onTerminated <- nil: - } - } - } -} - -func (crm *cancelRequestMessage) handle(rm *RequestManager) { - inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID] - if !ok { - if crm.onTerminated != nil { - select { - case crm.onTerminated <- graphsync.RequestNotFoundErr{}: - case <-rm.ctx.Done(): - } - } - return - } - - if crm.onTerminated != nil { - inProgressRequestStatus.onTerminated = append(inProgressRequestStatus.onTerminated, crm.onTerminated) - } - if crm.terminalError != nil { - select { - case inProgressRequestStatus.terminalError <- crm.terminalError: - default: - } - } - - rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) - if crm.isPause { - inProgressRequestStatus.paused = true - } else { - inProgressRequestStatus.cancelFn() - } -} - -func (prm *processResponseMessage) handle(rm *RequestManager) { - filteredResponses := rm.processExtensions(prm.responses, prm.p) - filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p) - rm.updateLastResponses(filteredResponses) - responseMetadata := metadataForResponses(filteredResponses) - rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks) - rm.processTerminations(filteredResponses) -} - -func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { - responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses)) - for _, response := range responses { - requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()] - if !ok || requestStatus.p != p { - continue - } - responsesForPeer = append(responsesForPeer, response) - } - return responsesForPeer -} - -func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { - remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses)) - for _, response := range responses { - success := rm.processExtensionsForResponse(p, response) - if success { - remainingResponses = append(remainingResponses, response) - } - } - return remainingResponses -} - -func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) { - for _, response := range responses { - rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response) - } -} - -func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool { - result := rm.responseHooks.ProcessResponseHooks(p, response) - if len(result.Extensions) > 0 { - updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) - rm.sendRequest(p, updateRequest) - } - if result.Err != nil { - requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()] - if !ok { - return false - } - responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown) - select { - case requestStatus.terminalError <- responseError: - default: - } - rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID())) - requestStatus.cancelFn() - return false - } - return true -} - -func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) { - for _, response := range responses { - if gsmsg.IsTerminalResponseCode(response.Status()) { - if gsmsg.IsTerminalFailureCode(response.Status()) { - requestStatus := rm.inProgressRequestStatuses[response.RequestID()] - responseError := rm.generateResponseErrorFromStatus(response.Status()) - select { - case requestStatus.terminalError <- responseError: - default: - } - requestStatus.cancelFn() - } - rm.asyncLoader.CompleteResponsesFor(response.RequestID()) - } - } -} - -func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error { - switch status { - case graphsync.RequestFailedBusy: - return graphsync.RequestFailedBusyErr{} - case graphsync.RequestFailedContentNotFound: - return graphsync.RequestFailedContentNotFoundErr{} - case graphsync.RequestFailedLegal: - return graphsync.RequestFailedLegalErr{} - case graphsync.RequestFailedUnknown: - return graphsync.RequestFailedUnknownErr{} - case graphsync.RequestCancelled: - return graphsync.RequestCancelledErr{} - default: - return fmt.Errorf("Unknown") - } -} - -func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error { - result := rm.blockHooks.ProcessBlockHooks(p, response, block) - if len(result.Extensions) > 0 { - updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) - rm.sendRequest(p, updateRequest) - } - if result.Err != nil { - _, isPause := result.Err.(hooks.ErrPaused) - select { - case <-rm.ctx.Done(): - case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause, nil, nil}: - } - } - return result.Err -} - -func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID) { - select { - case <-rm.ctx.Done(): - case rm.messages <- &terminateRequestMessage{requestID}: - } -} - -func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) { - _, err := ipldutil.EncodeNode(selectorSpec) - if err != nil { - return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err - } - _, err = selector.ParseSelector(selectorSpec) - if err != nil { - return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err - } - asCidLink, ok := root.(cidlink.Link) - if !ok { - return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid") - } - request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...) - hooksResult := rm.requestHooks.ProcessRequestHooks(p, request) - if hooksResult.PersistenceOption != "" { - dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption) - if err != nil { - return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err - } - request = request.ReplaceExtensions([]graphsync.ExtensionData{ - { - Name: graphsync.ExtensionDeDupByKey, - Data: dedupData, - }, - }) - } - err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption) - if err != nil { - return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err - } - return request, hooksResult, nil -} - -type reqSubscriber struct { - p peer.ID - request gsmsg.GraphSyncRequest - networkErrorListeners *listeners.NetworkErrorListeners -} - -func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { - mqEvt, isMQEvt := event.(messagequeue.Event) - if !isMQEvt || mqEvt.Name != messagequeue.Error { - return - } - - r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err) - //r.re.networkError <- mqEvt.Err - //r.re.terminateRequest() -} - -func (r reqSubscriber) OnClose(topic notifications.Topic) { -} - -const requestNetworkError = "request_network_error" - -func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { - sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}) - failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub} - rm.peerHandler.AllocateAndBuildMessage(p, 0, func(builder *gsmsg.Builder) { - builder.AddRequest(request) - }, []notifications.Notifee{failNotifee}) -} - -func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error { - inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id] - if !ok { - return graphsync.RequestNotFoundErr{} - } - if !inProgressRequestStatus.paused { - return errors.New("request is not paused") - } - inProgressRequestStatus.paused = false - select { - case <-inProgressRequestStatus.pauseMessages: - rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...)) - return nil - case <-rm.ctx.Done(): - return errors.New("context cancelled") - case inProgressRequestStatus.resumeMessages <- urm.extensions: - return nil - } -} -func (urm *unpauseRequestMessage) handle(rm *RequestManager) { - err := urm.unpause(rm) - select { - case <-rm.ctx.Done(): - case urm.response <- err: - } -} -func (prm *pauseRequestMessage) pause(rm *RequestManager) error { - inProgressRequestStatus, ok := rm.inProgressRequestStatuses[prm.id] - if !ok { - return graphsync.RequestNotFoundErr{} - } - if inProgressRequestStatus.paused { - return errors.New("request is already paused") - } - inProgressRequestStatus.paused = true - select { - case <-rm.ctx.Done(): - return errors.New("context cancelled") - case inProgressRequestStatus.pauseMessages <- struct{}{}: - return nil - } -} -func (prm *pauseRequestMessage) handle(rm *RequestManager) { - err := prm.pause(rm) - select { - case <-rm.ctx.Done(): - case prm.response <- err: - } -} diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 7d383b6c..caaadad6 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -93,8 +93,8 @@ func TestNormalSimultaneousFetch(t *testing.T) { blockChain2 := testutil.SetupBlockChain(ctx, t, td.persistence, 100, 5) - returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector()) + returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector()) requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) @@ -172,8 +172,8 @@ func TestCancelRequestInProgress(t *testing.T) { defer cancel2() peers := testutil.GeneratePeers(1) - returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) - returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) @@ -232,7 +232,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { } }) - _, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + _, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1) @@ -276,7 +276,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { defer cancel() peers := testutil.GeneratePeers(1) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -308,7 +308,7 @@ func TestFailedRequest(t *testing.T) { defer cancel() peers := testutil.GeneratePeers(1) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] failedResponses := []gsmsg.GraphSyncResponse{ @@ -328,7 +328,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) { defer cancel() peers := testutil.GeneratePeers(1) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -359,7 +359,7 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) { td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { close(called) }) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -387,7 +387,7 @@ func TestRequestReturnsMissingBlocks(t *testing.T) { defer cancel() peers := testutil.GeneratePeers(1) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -419,7 +419,7 @@ func TestDisconnectNotification(t *testing.T) { // Send a request to the target peer targetPeer := peers[0] - td.requestManager.SendRequest(requestCtx, targetPeer, td.blockChain.TipLink, td.blockChain.Selector()) + td.requestManager.NewRequest(requestCtx, targetPeer, td.blockChain.TipLink, td.blockChain.Selector()) // Disconnect a random peer, should not fire any events randomPeer := peers[1] @@ -465,7 +465,7 @@ func TestEncodingExtensions(t *testing.T) { } } td.responseHooks.Register(hook) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -579,7 +579,7 @@ func TestBlockHooks(t *testing.T) { } } td.blockHooks.Register(hook) - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -743,8 +743,8 @@ func TestOutgoingRequestHooks(t *testing.T) { } td.requestHooks.Register(hook) - returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1) - returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1) + returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) @@ -809,7 +809,7 @@ func TestPauseResume(t *testing.T) { td.blockHooks.Register(hook) // Start request - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -895,7 +895,7 @@ func TestPauseResumeExternal(t *testing.T) { td.blockHooks.Register(hook) // Start request - returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] diff --git a/requestmanager/server.go b/requestmanager/server.go new file mode 100644 index 00000000..afe372a0 --- /dev/null +++ b/requestmanager/server.go @@ -0,0 +1,296 @@ +package requestmanager + +import ( + "context" + "errors" + "fmt" + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/ipldutil" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/requestmanager/executor" + "github.com/ipfs/go-graphsync/requestmanager/hooks" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/libp2p/go-libp2p-core/peer" +) + +// The code in this file implements the internal thread for the request manager. +// These functions can modify the internal state of the RequestManager + +func (rm *RequestManager) run() { + // NOTE: Do not open any streams or connections from anywhere in this + // event loop. Really, just don't do anything likely to block. + defer rm.cleanupInProcessRequests() + + for { + select { + case message := <-rm.messages: + message.handle(rm) + case <-rm.ctx.Done(): + return + } + } +} + +func (rm *RequestManager) cleanupInProcessRequests() { + for _, requestStatus := range rm.inProgressRequestStatuses { + requestStatus.cancelFn() + } +} + +func (rm *RequestManager) setupRequest(p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { + requestID := rm.nextRequestID + rm.nextRequestID++ + + log.Infow("graphsync request initiated", "request id", requestID, "peer", p, "root", root) + + request, hooksResult, err := rm.validateRequest(requestID, p, root, selector, extensions) + if err != nil { + rp, err := rm.singleErrorResponse(err) + return request, rp, err + } + doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) + var doNotSendCids *cid.Set + if has { + doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData) + if err != nil { + rp, err := rm.singleErrorResponse(err) + return request, rp, err + } + } else { + doNotSendCids = cid.NewSet() + } + ctx, cancel := context.WithCancel(rm.ctx) + resumeMessages := make(chan []graphsync.ExtensionData, 1) + pauseMessages := make(chan struct{}, 1) + terminalError := make(chan error, 1) + requestStatus := &inProgressRequestStatus{ + ctx: ctx, startTime: time.Now(), cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, terminalError: terminalError, + } + lastResponse := &requestStatus.lastResponse + lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) + rm.inProgressRequestStatuses[request.ID()] = requestStatus + incoming, incomingError := executor.ExecutionEnv{ + Ctx: rm.ctx, + SendRequest: rm.SendRequest, + TerminateRequest: rm.TerminateRequest, + RunBlockHooks: rm.ProcessBlockHooks, + Loader: rm.asyncLoader.AsyncLoad, + LinkSystem: rm.linkSystem, + }.Start( + executor.RequestExecution{ + Ctx: ctx, + P: p, + Request: request, + TerminalError: terminalError, + LastResponse: lastResponse, + DoNotSendCids: doNotSendCids, + NodePrototypeChooser: hooksResult.CustomChooser, + ResumeMessages: resumeMessages, + PauseMessages: pauseMessages, + }) + return request, incoming, incomingError +} + +func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID) { + ipr, ok := rm.inProgressRequestStatuses[requestID] + if ok { + log.Infow("graphsync request complete", "request id", requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime)) + } + delete(rm.inProgressRequestStatuses, requestID) + rm.asyncLoader.CleanupRequest(requestID) + if ok { + for _, onTerminated := range ipr.onTerminated { + select { + case <-rm.ctx.Done(): + case onTerminated <- nil: + } + } + } +} + +func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, isPause bool, onTerminated chan<- error, terminalError error) { + inProgressRequestStatus, ok := rm.inProgressRequestStatuses[requestID] + if !ok { + if onTerminated != nil { + select { + case onTerminated <- graphsync.RequestNotFoundErr{}: + case <-rm.ctx.Done(): + } + } + return + } + + if onTerminated != nil { + inProgressRequestStatus.onTerminated = append(inProgressRequestStatus.onTerminated, onTerminated) + } + if terminalError != nil { + select { + case inProgressRequestStatus.terminalError <- terminalError: + default: + } + } + + rm.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(requestID)) + if isPause { + inProgressRequestStatus.paused = true + } else { + inProgressRequestStatus.cancelFn() + } +} + +func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { + filteredResponses := rm.processExtensions(responses, p) + filteredResponses = rm.filterResponsesForPeer(filteredResponses, p) + rm.updateLastResponses(filteredResponses) + responseMetadata := metadataForResponses(filteredResponses) + rm.asyncLoader.ProcessResponse(responseMetadata, blks) + rm.processTerminations(filteredResponses) +} + +func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { + responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses)) + for _, response := range responses { + requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()] + if !ok || requestStatus.p != p { + continue + } + responsesForPeer = append(responsesForPeer, response) + } + return responsesForPeer +} + +func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { + remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses)) + for _, response := range responses { + success := rm.processExtensionsForResponse(p, response) + if success { + remainingResponses = append(remainingResponses, response) + } + } + return remainingResponses +} + +func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) { + for _, response := range responses { + rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response) + } +} + +func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool { + result := rm.responseHooks.ProcessResponseHooks(p, response) + if len(result.Extensions) > 0 { + updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) + rm.SendRequest(p, updateRequest) + } + if result.Err != nil { + requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()] + if !ok { + return false + } + responseError := graphsync.RequestFailedUnknown.AsError() + select { + case requestStatus.terminalError <- responseError: + default: + } + rm.SendRequest(p, gsmsg.CancelRequest(response.RequestID())) + requestStatus.cancelFn() + return false + } + return true +} + +func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) { + for _, response := range responses { + if response.Status().IsTerminal() { + if response.Status().IsFailure() { + requestStatus := rm.inProgressRequestStatuses[response.RequestID()] + responseError := response.Status().AsError() + select { + case requestStatus.terminalError <- responseError: + default: + } + requestStatus.cancelFn() + } + rm.asyncLoader.CompleteResponsesFor(response.RequestID()) + } + } +} + +func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) { + _, err := ipldutil.EncodeNode(selectorSpec) + if err != nil { + return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err + } + _, err = selector.ParseSelector(selectorSpec) + if err != nil { + return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err + } + asCidLink, ok := root.(cidlink.Link) + if !ok { + return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid") + } + request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...) + hooksResult := rm.requestHooks.ProcessRequestHooks(p, request) + if hooksResult.PersistenceOption != "" { + dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption) + if err != nil { + return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err + } + request = request.ReplaceExtensions([]graphsync.ExtensionData{ + { + Name: graphsync.ExtensionDeDupByKey, + Data: dedupData, + }, + }) + } + err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption) + if err != nil { + return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err + } + return request, hooksResult, nil +} + +func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync.ExtensionData) error { + inProgressRequestStatus, ok := rm.inProgressRequestStatuses[id] + if !ok { + return graphsync.RequestNotFoundErr{} + } + if !inProgressRequestStatus.paused { + return errors.New("request is not paused") + } + inProgressRequestStatus.paused = false + select { + case <-inProgressRequestStatus.pauseMessages: + rm.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(id, extensions...)) + return nil + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case inProgressRequestStatus.resumeMessages <- extensions: + return nil + } +} + +func (rm *RequestManager) pause(id graphsync.RequestID) error { + inProgressRequestStatus, ok := rm.inProgressRequestStatuses[id] + if !ok { + return graphsync.RequestNotFoundErr{} + } + if inProgressRequestStatus.paused { + return errors.New("request is already paused") + } + inProgressRequestStatus.paused = true + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case inProgressRequestStatus.pauseMessages <- struct{}{}: + return nil + } +} diff --git a/responsecode.go b/responsecode.go index 3e05ec32..c4a1edfc 100644 --- a/responsecode.go +++ b/responsecode.go @@ -77,3 +77,47 @@ var ResponseCodeToName = map[ResponseStatusCode]string{ RequestFailedContentNotFound: "RequestFailedContentNotFound", RequestCancelled: "RequestCancelled", } + +// AsError generates an error from the status code for a failing status +func (c ResponseStatusCode) AsError() error { + if c.IsSuccess() { + return nil + } + switch c { + case RequestFailedBusy: + return RequestFailedBusyErr{} + case RequestFailedContentNotFound: + return RequestFailedContentNotFoundErr{} + case RequestFailedLegal: + return RequestFailedLegalErr{} + case RequestFailedUnknown: + return RequestFailedUnknownErr{} + case RequestCancelled: + return RequestCancelledErr{} + default: + return fmt.Errorf("unknown response status code: %d", c) + } +} + +// IsSuccess returns true if the response code indicates the +// request terminated successfully. +func (c ResponseStatusCode) IsSuccess() bool { + return c == RequestCompletedFull || c == RequestCompletedPartial +} + +// IsFailure returns true if the response code indicates the +// request terminated in failure. +func (c ResponseStatusCode) IsFailure() bool { + return c == RequestFailedBusy || + c == RequestFailedContentNotFound || + c == RequestFailedLegal || + c == RequestFailedUnknown || + c == RequestCancelled || + c == RequestRejected +} + +// IsTerminal returns true if the response code signals +// the end of the request +func (c ResponseStatusCode) IsTerminal() bool { + return c.IsSuccess() || c.IsFailure() +} diff --git a/responsemanager/client.go b/responsemanager/client.go new file mode 100644 index 00000000..f93834bf --- /dev/null +++ b/responsemanager/client.go @@ -0,0 +1,281 @@ +package responsemanager + +import ( + "context" + "errors" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/ipfs/go-peertaskqueue/peertask" + ipld "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/ipldutil" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/hooks" + "github.com/ipfs/go-graphsync/responsemanager/responseassembler" +) + +// The code in this file implements the public interface of the response manager. +// Functions in this file operate outside the internal thread and should +// NOT modify the internal state of the ResponseManager. + +var log = logging.Logger("graphsync") + +const ( + thawSpeed = time.Millisecond * 100 +) + +type inProgressResponseStatus struct { + ctx context.Context + cancelFn func() + request gsmsg.GraphSyncRequest + loader ipld.BlockReadOpener + traverser ipldutil.Traverser + signals ResponseSignals + updates []gsmsg.GraphSyncRequest + isPaused bool + subscriber *notifications.TopicDataSubscriber +} + +type responseKey struct { + p peer.ID + requestID graphsync.RequestID +} + +// ResponseSignals are message channels to communicate between the manager and the query +type ResponseSignals struct { + PauseSignal chan struct{} + UpdateSignal chan struct{} + ErrSignal chan error +} + +// ResponseTaskData returns all information needed to execute a given response +type ResponseTaskData struct { + Empty bool + Subscriber *notifications.TopicDataSubscriber + Ctx context.Context + Request gsmsg.GraphSyncRequest + Loader ipld.BlockReadOpener + Traverser ipldutil.Traverser + Signals ResponseSignals +} + +// QueryQueue is an interface that can receive new selector query tasks +// and prioritize them as needed, and pop them off later +type QueryQueue interface { + PushTasks(to peer.ID, tasks ...peertask.Task) + PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) + Remove(topic peertask.Topic, p peer.ID) + TasksDone(to peer.ID, tasks ...*peertask.Task) + ThawRound() +} + +// RequestHooks is an interface for processing request hooks +type RequestHooks interface { + ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult +} + +// RequestQueuedHooks is an interface for processing request queued hooks +type RequestQueuedHooks interface { + ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) +} + +// BlockHooks is an interface for processing block hooks +type BlockHooks interface { + ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) hooks.BlockResult +} + +// UpdateHooks is an interface for processing update hooks +type UpdateHooks interface { + ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult +} + +// CompletedListeners is an interface for notifying listeners that responses are complete +type CompletedListeners interface { + NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) +} + +// CancelledListeners is an interface for notifying listeners that requestor cancelled +type CancelledListeners interface { + NotifyCancelledListeners(p peer.ID, request graphsync.RequestData) +} + +// BlockSentListeners is an interface for notifying listeners that of a block send occuring over the wire +type BlockSentListeners interface { + NotifyBlockSentListeners(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) +} + +// NetworkErrorListeners is an interface for notifying listeners that an error occurred sending a data on the wire +type NetworkErrorListeners interface { + NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error) +} + +// ResponseAssembler is an interface that returns sender interfaces for peer responses. +type ResponseAssembler interface { + DedupKey(p peer.ID, requestID graphsync.RequestID, key string) + IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) + Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error +} + +type responseManagerMessage interface { + handle(rm *ResponseManager) +} + +// ResponseManager handles incoming requests from the network, initiates selector +// traversals, and transmits responses +type ResponseManager struct { + ctx context.Context + cancelFn context.CancelFunc + responseAssembler ResponseAssembler + queryQueue QueryQueue + requestHooks RequestHooks + linkSystem ipld.LinkSystem + requestQueuedHooks RequestQueuedHooks + updateHooks UpdateHooks + cancelledListeners CancelledListeners + completedListeners CompletedListeners + blockSentListeners BlockSentListeners + networkErrorListeners NetworkErrorListeners + messages chan responseManagerMessage + workSignal chan struct{} + qe *queryExecutor + inProgressResponses map[responseKey]*inProgressResponseStatus + maxInProcessRequests uint64 +} + +// New creates a new response manager for responding to requests +func New(ctx context.Context, + linkSystem ipld.LinkSystem, + responseAssembler ResponseAssembler, + queryQueue QueryQueue, + requestQueuedHooks RequestQueuedHooks, + requestHooks RequestHooks, + blockHooks BlockHooks, + updateHooks UpdateHooks, + completedListeners CompletedListeners, + cancelledListeners CancelledListeners, + blockSentListeners BlockSentListeners, + networkErrorListeners NetworkErrorListeners, + maxInProcessRequests uint64, +) *ResponseManager { + ctx, cancelFn := context.WithCancel(ctx) + messages := make(chan responseManagerMessage, 16) + workSignal := make(chan struct{}, 1) + rm := &ResponseManager{ + ctx: ctx, + cancelFn: cancelFn, + requestHooks: requestHooks, + linkSystem: linkSystem, + responseAssembler: responseAssembler, + queryQueue: queryQueue, + requestQueuedHooks: requestQueuedHooks, + updateHooks: updateHooks, + cancelledListeners: cancelledListeners, + completedListeners: completedListeners, + blockSentListeners: blockSentListeners, + networkErrorListeners: networkErrorListeners, + messages: messages, + workSignal: workSignal, + inProgressResponses: make(map[responseKey]*inProgressResponseStatus), + maxInProcessRequests: maxInProcessRequests, + } + rm.qe = &queryExecutor{ + blockHooks: blockHooks, + updateHooks: updateHooks, + cancelledListeners: cancelledListeners, + responseAssembler: responseAssembler, + queryQueue: queryQueue, + manager: rm, + ctx: ctx, + workSignal: workSignal, + ticker: time.NewTicker(thawSpeed), + } + return rm +} + +// ProcessRequests processes incoming requests for the given peer +func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) { + rm.send(&processRequestMessage{p, requests}, ctx.Done()) +} + +// UnpauseResponse unpauses a response that was previously paused +func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { + response := make(chan error, 1) + rm.send(&unpauseRequestMessage{p, requestID, response, extensions}, nil) + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case err := <-response: + return err + } +} + +// PauseResponse pauses an in progress response (may take 1 or more blocks to process) +func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error { + response := make(chan error, 1) + rm.send(&pauseRequestMessage{p, requestID, response}, nil) + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case err := <-response: + return err + } +} + +// CancelResponse cancels an in progress response +func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error { + response := make(chan error, 1) + rm.send(&errorRequestMessage{p, requestID, errCancelledByCommand, response}, nil) + select { + case <-rm.ctx.Done(): + return errors.New("context cancelled") + case err := <-response: + return err + } +} + +// this is a test utility method to force all messages to get processed +func (rm *ResponseManager) synchronize() { + sync := make(chan error) + rm.send(&synchronizeMessage{sync}, nil) + select { + case <-rm.ctx.Done(): + case <-sync: + } +} + +// StartTask starts the given task from the peer task queue +func (rm *ResponseManager) StartTask(task *peertask.Task, responseTaskDataChan chan<- ResponseTaskData) { + rm.send(&startTaskRequest{task, responseTaskDataChan}, nil) +} + +// GetUpdates is called to read pending updates for a task and clear them +func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) { + rm.send(&responseUpdateRequest{responseKey{p, requestID}, updatesChan}, nil) +} + +// FinishTask marks a task from the task queue as done +func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) { + rm.send(&finishTaskRequest{task, err}, nil) +} + +func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) { + select { + case <-rm.ctx.Done(): + case <-done: + case rm.messages <- message: + } +} + +// Startup starts processing for the WantManager. +func (rm *ResponseManager) Startup() { + go rm.run() +} + +// Shutdown ends processing for the want manager. +func (rm *ResponseManager) Shutdown() { + rm.cancelFn() +} diff --git a/responsemanager/messages.go b/responsemanager/messages.go new file mode 100644 index 00000000..6cf27e3b --- /dev/null +++ b/responsemanager/messages.go @@ -0,0 +1,109 @@ +package responsemanager + +import ( + "github.com/ipfs/go-graphsync" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/libp2p/go-libp2p-core/peer" +) + +type processRequestMessage struct { + p peer.ID + requests []gsmsg.GraphSyncRequest +} + +type pauseRequestMessage struct { + p peer.ID + requestID graphsync.RequestID + response chan error +} + +func (prm *pauseRequestMessage) handle(rm *ResponseManager) { + err := rm.pauseRequest(prm.p, prm.requestID) + select { + case <-rm.ctx.Done(): + case prm.response <- err: + } +} + +type errorRequestMessage struct { + p peer.ID + requestID graphsync.RequestID + err error + response chan error +} + +func (erm *errorRequestMessage) handle(rm *ResponseManager) { + err := rm.abortRequest(erm.p, erm.requestID, erm.err) + select { + case <-rm.ctx.Done(): + case erm.response <- err: + } +} + +type synchronizeMessage struct { + sync chan error +} + +func (sm *synchronizeMessage) handle(rm *ResponseManager) { + select { + case <-rm.ctx.Done(): + case sm.sync <- nil: + } +} + +type unpauseRequestMessage struct { + p peer.ID + requestID graphsync.RequestID + response chan error + extensions []graphsync.ExtensionData +} + +func (urm *unpauseRequestMessage) handle(rm *ResponseManager) { + err := rm.unpauseRequest(urm.p, urm.requestID, urm.extensions...) + select { + case <-rm.ctx.Done(): + case urm.response <- err: + } +} + +type responseUpdateRequest struct { + key responseKey + updateChan chan<- []gsmsg.GraphSyncRequest +} + +func (rur *responseUpdateRequest) handle(rm *ResponseManager) { + updates := rm.getUpdates(rur.key) + select { + case <-rm.ctx.Done(): + case rur.updateChan <- updates: + } +} + +type finishTaskRequest struct { + task *peertask.Task + err error +} + +func (ftr *finishTaskRequest) handle(rm *ResponseManager) { + rm.finishTask(ftr.task, ftr.err) +} + +type startTaskRequest struct { + task *peertask.Task + taskDataChan chan<- ResponseTaskData +} + +func (str *startTaskRequest) handle(rm *ResponseManager) { + + taskData := rm.startTask(str.task) + + select { + case <-rm.ctx.Done(): + case str.taskDataChan <- taskData: + } +} + +func (prm *processRequestMessage) handle(rm *ResponseManager) { + rm.processRequests(prm.p, prm.requests) +} diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 44d1a718..14c2d9ea 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -16,18 +16,26 @@ import ( "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/responseassembler" "github.com/ipfs/go-graphsync/responsemanager/runtraversal" + "github.com/ipfs/go-peertaskqueue/peertask" ) var errCancelledByCommand = errors.New("response cancelled by responder") +// Manager providers an interface to the response manager +type Manager interface { + StartTask(task *peertask.Task, responseTaskDataChan chan<- ResponseTaskData) + GetUpdates(p peer.ID, requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) + FinishTask(task *peertask.Task, err error) +} + // TODO: Move this into a seperate module and fully seperate from the ResponseManager type queryExecutor struct { + manager Manager blockHooks BlockHooks updateHooks UpdateHooks cancelledListeners CancelledListeners responseAssembler ResponseAssembler queryQueue QueryQueue - messages chan responseManagerMessage ctx context.Context workSignal chan struct{} ticker *time.Ticker @@ -35,8 +43,8 @@ type queryExecutor struct { func (qe *queryExecutor) processQueriesWorker() { const targetWork = 1 - taskDataChan := make(chan responseTaskData) - var taskData responseTaskData + taskDataChan := make(chan ResponseTaskData) + var taskData ResponseTaskData for { pid, tasks, _ := qe.queryQueue.PopTasks(targetWork) for len(tasks) == 0 { @@ -51,31 +59,24 @@ func (qe *queryExecutor) processQueriesWorker() { } } for _, task := range tasks { - select { - case qe.messages <- &startTaskRequest{task, taskDataChan}: - case <-qe.ctx.Done(): - return - } + qe.manager.StartTask(task, taskDataChan) select { case taskData = <-taskDataChan: case <-qe.ctx.Done(): return } - if taskData.empty { + if taskData.Empty { log.Info("Empty task on peer request stack") continue } - log.Debugw("beginning response execution", "id", taskData.request.ID(), "peer", pid.String(), "root_cid", taskData.request.Root().String()) - status, err := qe.executeQuery(pid, taskData.request, taskData.loader, taskData.traverser, taskData.signals, taskData.subscriber) + log.Debugw("beginning response execution", "id", taskData.Request.ID(), "peer", pid.String(), "root_cid", taskData.Request.Root().String()) + _, err := qe.executeQuery(pid, taskData.Request, taskData.Loader, taskData.Traverser, taskData.Signals, taskData.Subscriber) isCancelled := err != nil && isContextErr(err) if isCancelled { - qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.request) + qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.Request) } - select { - case qe.messages <- &finishTaskRequest{task, status, err}: - case <-qe.ctx.Done(): - } - log.Debugw("finishing response execution", "id", taskData.request.ID(), "peer", pid.String(), "root_cid", taskData.request.Root().String()) + qe.manager.FinishTask(task, err) + log.Debugw("finishing response execution", "id", taskData.Request.ID(), "peer", pid.String(), "root_cid", taskData.Request.Root().String()) } } } @@ -85,7 +86,7 @@ func (qe *queryExecutor) executeQuery( request gsmsg.GraphSyncRequest, loader ipld.BlockReadOpener, traverser ipldutil.Traverser, - signals signals, + signals ResponseSignals, sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) { updateChan := make(chan []gsmsg.GraphSyncRequest) err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error { @@ -151,21 +152,18 @@ func (qe *queryExecutor) executeQuery( func (qe *queryExecutor) checkForUpdates( p peer.ID, request gsmsg.GraphSyncRequest, - signals signals, + signals ResponseSignals, updateChan chan []gsmsg.GraphSyncRequest, rb responseassembler.ResponseBuilder) error { for { select { - case <-signals.pauseSignal: + case <-signals.PauseSignal: rb.PauseRequest() return hooks.ErrPaused{} - case err := <-signals.errSignal: + case err := <-signals.ErrSignal: return err - case <-signals.updateSignal: - select { - case qe.messages <- &responseUpdateRequest{responseKey{p, request.ID()}, updateChan}: - case <-qe.ctx.Done(): - } + case <-signals.UpdateSignal: + qe.manager.GetUpdates(p, request.ID(), updateChan) select { case updates := <-updateChan: for _, update := range updates { diff --git a/responsemanager/querypreparer.go b/responsemanager/querypreparer.go index 067786e9..811e285f 100644 --- a/responsemanager/querypreparer.go +++ b/responsemanager/querypreparer.go @@ -26,7 +26,7 @@ type queryPreparer struct { func (qe *queryPreparer) prepareQuery(ctx context.Context, p peer.ID, - request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.BlockReadOpener, ipldutil.Traverser, bool, error) { + request gsmsg.GraphSyncRequest, signals ResponseSignals, sub *notifications.TopicDataSubscriber) (ipld.BlockReadOpener, ipldutil.Traverser, bool, error) { result := qe.requestHooks.ProcessRequestHooks(p, request) var transactionError error var isPaused bool diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go deleted file mode 100644 index 41faab18..00000000 --- a/responsemanager/responsemanager.go +++ /dev/null @@ -1,578 +0,0 @@ -package responsemanager - -import ( - "context" - "errors" - "math" - "time" - - logging "github.com/ipfs/go-log/v2" - "github.com/ipfs/go-peertaskqueue/peertask" - ipld "github.com/ipld/go-ipld-prime" - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/ipldutil" - gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/notifications" - "github.com/ipfs/go-graphsync/responsemanager/hooks" - "github.com/ipfs/go-graphsync/responsemanager/responseassembler" -) - -var log = logging.Logger("graphsync") - -const ( - thawSpeed = time.Millisecond * 100 -) - -type inProgressResponseStatus struct { - ctx context.Context - cancelFn func() - request gsmsg.GraphSyncRequest - loader ipld.BlockReadOpener - traverser ipldutil.Traverser - signals signals - updates []gsmsg.GraphSyncRequest - isPaused bool - subscriber *notifications.TopicDataSubscriber -} - -type responseKey struct { - p peer.ID - requestID graphsync.RequestID -} - -type signals struct { - pauseSignal chan struct{} - updateSignal chan struct{} - errSignal chan error -} - -type responseTaskData struct { - empty bool - subscriber *notifications.TopicDataSubscriber - ctx context.Context - request gsmsg.GraphSyncRequest - loader ipld.BlockReadOpener - traverser ipldutil.Traverser - signals signals -} - -// QueryQueue is an interface that can receive new selector query tasks -// and prioritize them as needed, and pop them off later -type QueryQueue interface { - PushTasks(to peer.ID, tasks ...peertask.Task) - PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) - Remove(topic peertask.Topic, p peer.ID) - TasksDone(to peer.ID, tasks ...*peertask.Task) - ThawRound() -} - -// RequestHooks is an interface for processing request hooks -type RequestHooks interface { - ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult -} - -// RequestQueuedHooks is an interface for processing request queued hooks -type RequestQueuedHooks interface { - ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) -} - -// BlockHooks is an interface for processing block hooks -type BlockHooks interface { - ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) hooks.BlockResult -} - -// UpdateHooks is an interface for processing update hooks -type UpdateHooks interface { - ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult -} - -// CompletedListeners is an interface for notifying listeners that responses are complete -type CompletedListeners interface { - NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) -} - -// CancelledListeners is an interface for notifying listeners that requestor cancelled -type CancelledListeners interface { - NotifyCancelledListeners(p peer.ID, request graphsync.RequestData) -} - -// BlockSentListeners is an interface for notifying listeners that of a block send occuring over the wire -type BlockSentListeners interface { - NotifyBlockSentListeners(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) -} - -// NetworkErrorListeners is an interface for notifying listeners that an error occurred sending a data on the wire -type NetworkErrorListeners interface { - NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error) -} - -// ResponseAssembler is an interface that returns sender interfaces for peer responses. -type ResponseAssembler interface { - DedupKey(p peer.ID, requestID graphsync.RequestID, key string) - IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) - Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error -} - -type responseManagerMessage interface { - handle(rm *ResponseManager) -} - -// ResponseManager handles incoming requests from the network, initiates selector -// traversals, and transmits responses -type ResponseManager struct { - ctx context.Context - cancelFn context.CancelFunc - responseAssembler ResponseAssembler - queryQueue QueryQueue - requestHooks RequestHooks - linkSystem ipld.LinkSystem - requestQueuedHooks RequestQueuedHooks - updateHooks UpdateHooks - cancelledListeners CancelledListeners - completedListeners CompletedListeners - blockSentListeners BlockSentListeners - networkErrorListeners NetworkErrorListeners - messages chan responseManagerMessage - workSignal chan struct{} - qe *queryExecutor - inProgressResponses map[responseKey]*inProgressResponseStatus - maxInProcessRequests uint64 -} - -// New creates a new response manager for responding to requests -func New(ctx context.Context, - linkSystem ipld.LinkSystem, - responseAssembler ResponseAssembler, - queryQueue QueryQueue, - requestQueuedHooks RequestQueuedHooks, - requestHooks RequestHooks, - blockHooks BlockHooks, - updateHooks UpdateHooks, - completedListeners CompletedListeners, - cancelledListeners CancelledListeners, - blockSentListeners BlockSentListeners, - networkErrorListeners NetworkErrorListeners, - maxInProcessRequests uint64, -) *ResponseManager { - ctx, cancelFn := context.WithCancel(ctx) - messages := make(chan responseManagerMessage, 16) - workSignal := make(chan struct{}, 1) - qe := &queryExecutor{ - blockHooks: blockHooks, - updateHooks: updateHooks, - cancelledListeners: cancelledListeners, - responseAssembler: responseAssembler, - queryQueue: queryQueue, - messages: messages, - ctx: ctx, - workSignal: workSignal, - ticker: time.NewTicker(thawSpeed), - } - return &ResponseManager{ - ctx: ctx, - cancelFn: cancelFn, - requestHooks: requestHooks, - linkSystem: linkSystem, - responseAssembler: responseAssembler, - queryQueue: queryQueue, - requestQueuedHooks: requestQueuedHooks, - updateHooks: updateHooks, - cancelledListeners: cancelledListeners, - completedListeners: completedListeners, - blockSentListeners: blockSentListeners, - networkErrorListeners: networkErrorListeners, - messages: messages, - workSignal: workSignal, - qe: qe, - inProgressResponses: make(map[responseKey]*inProgressResponseStatus), - maxInProcessRequests: maxInProcessRequests, - } -} - -type processRequestMessage struct { - p peer.ID - requests []gsmsg.GraphSyncRequest -} - -// ProcessRequests processes incoming requests for the given peer -func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) { - select { - case rm.messages <- &processRequestMessage{p, requests}: - case <-rm.ctx.Done(): - case <-ctx.Done(): - } -} - -type unpauseRequestMessage struct { - p peer.ID - requestID graphsync.RequestID - response chan error - extensions []graphsync.ExtensionData -} - -// UnpauseResponse unpauses a response that was previously paused -func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - response := make(chan error, 1) - return rm.sendSyncMessage(&unpauseRequestMessage{p, requestID, response, extensions}, response) -} - -type pauseRequestMessage struct { - p peer.ID - requestID graphsync.RequestID - response chan error -} - -// PauseResponse pauses an in progress response (may take 1 or more blocks to process) -func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error { - response := make(chan error, 1) - return rm.sendSyncMessage(&pauseRequestMessage{p, requestID, response}, response) -} - -type errorRequestMessage struct { - p peer.ID - requestID graphsync.RequestID - err error - response chan error -} - -// CancelResponse cancels an in progress response -func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error { - response := make(chan error, 1) - return rm.sendSyncMessage(&errorRequestMessage{p, requestID, errCancelledByCommand, response}, response) -} - -func (rm *ResponseManager) sendSyncMessage(message responseManagerMessage, response chan error) error { - select { - case <-rm.ctx.Done(): - return errors.New("context cancelled") - case rm.messages <- message: - } - select { - case <-rm.ctx.Done(): - return errors.New("context cancelled") - case err := <-response: - return err - } -} - -type synchronizeMessage struct { - sync chan error -} - -// this is a test utility method to force all messages to get processed -func (rm *ResponseManager) synchronize() { - sync := make(chan error) - _ = rm.sendSyncMessage(&synchronizeMessage{sync}, sync) -} - -type startTaskRequest struct { - task *peertask.Task - taskDataChan chan responseTaskData -} - -type finishTaskRequest struct { - task *peertask.Task - status graphsync.ResponseStatusCode - err error -} - -type responseUpdateRequest struct { - key responseKey - updateChan chan []gsmsg.GraphSyncRequest -} - -// Startup starts processing for the WantManager. -func (rm *ResponseManager) Startup() { - go rm.run() -} - -// Shutdown ends processing for the want manager. -func (rm *ResponseManager) Shutdown() { - rm.cancelFn() -} - -func (rm *ResponseManager) cleanupInProcessResponses() { - for _, response := range rm.inProgressResponses { - response.cancelFn() - } -} - -func (rm *ResponseManager) run() { - defer rm.cleanupInProcessResponses() - for i := uint64(0); i < rm.maxInProcessRequests; i++ { - go rm.qe.processQueriesWorker() - } - - for { - select { - case <-rm.ctx.Done(): - return - case message := <-rm.messages: - message.handle(rm) - } - } -} - -func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) { - response, ok := rm.inProgressResponses[key] - if !ok { - log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID) - return - } - if !response.isPaused { - response.updates = append(response.updates, update) - select { - case response.signals.updateSignal <- struct{}{}: - default: - } - return - } - result := rm.updateHooks.ProcessUpdateHooks(key.p, response.request, update) - err := rm.responseAssembler.Transaction(key.p, key.requestID, func(rb responseassembler.ResponseBuilder) error { - for _, extension := range result.Extensions { - rb.SendExtensionData(extension) - } - if result.Err != nil { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: response.subscriber}) - } - return nil - }) - if err != nil { - log.Errorf("Error processing update: %s", err) - } - if result.Err != nil { - delete(rm.inProgressResponses, key) - response.cancelFn() - return - } - if result.Unpause { - err := rm.unpauseRequest(key.p, key.requestID) - if err != nil { - log.Warnf("error unpausing request: %s", err.Error()) - } - } - -} - -func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - key := responseKey{p, requestID} - inProgressResponse, ok := rm.inProgressResponses[key] - if !ok { - return errors.New("could not find request") - } - if !inProgressResponse.isPaused { - return errors.New("request is not paused") - } - inProgressResponse.isPaused = false - if len(extensions) > 0 { - _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { - for _, extension := range extensions { - rb.SendExtensionData(extension) - } - return nil - }) - } - rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1}) - select { - case rm.workSignal <- struct{}{}: - default: - } - return nil -} - -func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error { - key := responseKey{p, requestID} - rm.queryQueue.Remove(key, key.p) - response, ok := rm.inProgressResponses[key] - if !ok { - return errors.New("could not find request") - } - - if response.isPaused { - _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { - if isContextErr(err) { - - rm.cancelledListeners.NotifyCancelledListeners(p, response.request) - rb.ClearRequest() - } else if err == errNetworkError { - rb.ClearRequest() - } else { - rb.FinishWithError(graphsync.RequestCancelled) - rb.AddNotifee(notifications.Notifee{Data: graphsync.RequestCancelled, Subscriber: response.subscriber}) - } - return nil - }) - delete(rm.inProgressResponses, key) - response.cancelFn() - return nil - } - select { - case response.signals.errSignal <- err: - default: - } - return nil -} - -func (prm *processRequestMessage) handle(rm *ResponseManager) { - for _, request := range prm.requests { - key := responseKey{p: prm.p, requestID: request.ID()} - if request.IsCancel() { - _ = rm.abortRequest(prm.p, request.ID(), ipldutil.ContextCancelError{}) - continue - } - if request.IsUpdate() { - rm.processUpdate(key, request) - continue - } - rm.requestQueuedHooks.ProcessRequestQueuedHooks(prm.p, request) - ctx, cancelFn := context.WithCancel(rm.ctx) - sub := notifications.NewTopicDataSubscriber(&subscriber{ - p: key.p, - request: request, - ctx: rm.ctx, - messages: rm.messages, - blockSentListeners: rm.blockSentListeners, - completedListeners: rm.completedListeners, - networkErrorListeners: rm.networkErrorListeners, - }) - - rm.inProgressResponses[key] = - &inProgressResponseStatus{ - ctx: ctx, - cancelFn: cancelFn, - subscriber: sub, - request: request, - signals: signals{ - pauseSignal: make(chan struct{}, 1), - updateSignal: make(chan struct{}, 1), - errSignal: make(chan error, 1), - }, - } - // TODO: Use a better work estimation metric. - - rm.queryQueue.PushTasks(prm.p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) - - select { - case rm.workSignal <- struct{}{}: - default: - } - } -} - -func (str *startTaskRequest) handle(rm *ResponseManager) { - key := str.task.Topic.(responseKey) - taskData := str.responseTaskData(rm, key) - if taskData.empty { - rm.queryQueue.TasksDone(key.p, str.task) - } - select { - case <-rm.ctx.Done(): - case str.taskDataChan <- taskData: - } -} - -func (str *startTaskRequest) responseTaskData(rm *ResponseManager, key responseKey) responseTaskData { - response, hasResponse := rm.inProgressResponses[key] - if !hasResponse { - return responseTaskData{empty: true} - } - - if response.loader == nil || response.traverser == nil { - loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.linkSystem}).prepareQuery(response.ctx, key.p, response.request, response.signals, response.subscriber) - if err != nil { - response.cancelFn() - delete(rm.inProgressResponses, key) - return responseTaskData{empty: true} - } - response.loader = loader - response.traverser = traverser - if isPaused { - response.isPaused = true - return responseTaskData{empty: true} - } - } - return responseTaskData{false, response.subscriber, response.ctx, response.request, response.loader, response.traverser, response.signals} -} - -func (ftr *finishTaskRequest) handle(rm *ResponseManager) { - key := ftr.task.Topic.(responseKey) - rm.queryQueue.TasksDone(key.p, ftr.task) - response, ok := rm.inProgressResponses[key] - if !ok { - return - } - if _, ok := ftr.err.(hooks.ErrPaused); ok { - response.isPaused = true - return - } - if ftr.err != nil { - log.Infof("response failed: %w", ftr.err) - } - delete(rm.inProgressResponses, key) - response.cancelFn() -} - -func (rur *responseUpdateRequest) handle(rm *ResponseManager) { - response, ok := rm.inProgressResponses[rur.key] - var updates []gsmsg.GraphSyncRequest - if ok { - updates = response.updates - response.updates = nil - } else { - updates = nil - } - select { - case <-rm.ctx.Done(): - case rur.updateChan <- updates: - } -} - -func (sm *synchronizeMessage) handle(rm *ResponseManager) { - select { - case <-rm.ctx.Done(): - case sm.sync <- nil: - } -} - -func (urm *unpauseRequestMessage) handle(rm *ResponseManager) { - err := rm.unpauseRequest(urm.p, urm.requestID, urm.extensions...) - select { - case <-rm.ctx.Done(): - case urm.response <- err: - } -} - -func (prm *pauseRequestMessage) pauseRequest(rm *ResponseManager) error { - key := responseKey{prm.p, prm.requestID} - inProgressResponse, ok := rm.inProgressResponses[key] - if !ok { - return errors.New("could not find request") - } - if inProgressResponse.isPaused { - return errors.New("request is already paused") - } - select { - case inProgressResponse.signals.pauseSignal <- struct{}{}: - default: - } - return nil -} - -func (prm *pauseRequestMessage) handle(rm *ResponseManager) { - err := prm.pauseRequest(rm) - select { - case <-rm.ctx.Done(): - case prm.response <- err: - } -} - -func (crm *errorRequestMessage) handle(rm *ResponseManager) { - err := rm.abortRequest(crm.p, crm.requestID, crm.err) - select { - case <-rm.ctx.Done(): - case crm.response <- err: - } -} diff --git a/responsemanager/server.go b/responsemanager/server.go new file mode 100644 index 00000000..c06f3f3e --- /dev/null +++ b/responsemanager/server.go @@ -0,0 +1,263 @@ +package responsemanager + +import ( + "context" + "errors" + "math" + + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/ipldutil" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/hooks" + "github.com/ipfs/go-graphsync/responsemanager/responseassembler" + "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/libp2p/go-libp2p-core/peer" +) + +// The code in this file implements the internal thread for the response manager. +// These functions can modify the internal state of the ResponseManager + +func (rm *ResponseManager) cleanupInProcessResponses() { + for _, response := range rm.inProgressResponses { + response.cancelFn() + } +} + +func (rm *ResponseManager) run() { + defer rm.cleanupInProcessResponses() + for i := uint64(0); i < rm.maxInProcessRequests; i++ { + go rm.qe.processQueriesWorker() + } + + for { + select { + case <-rm.ctx.Done(): + return + case message := <-rm.messages: + message.handle(rm) + } + } +} + +func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) { + response, ok := rm.inProgressResponses[key] + if !ok { + log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID) + return + } + if !response.isPaused { + response.updates = append(response.updates, update) + select { + case response.signals.UpdateSignal <- struct{}{}: + default: + } + return + } + result := rm.updateHooks.ProcessUpdateHooks(key.p, response.request, update) + err := rm.responseAssembler.Transaction(key.p, key.requestID, func(rb responseassembler.ResponseBuilder) error { + for _, extension := range result.Extensions { + rb.SendExtensionData(extension) + } + if result.Err != nil { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: response.subscriber}) + } + return nil + }) + if err != nil { + log.Errorf("Error processing update: %s", err) + } + if result.Err != nil { + delete(rm.inProgressResponses, key) + response.cancelFn() + return + } + if result.Unpause { + err := rm.unpauseRequest(key.p, key.requestID) + if err != nil { + log.Warnf("error unpausing request: %s", err.Error()) + } + } + +} + +func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { + key := responseKey{p, requestID} + inProgressResponse, ok := rm.inProgressResponses[key] + if !ok { + return errors.New("could not find request") + } + if !inProgressResponse.isPaused { + return errors.New("request is not paused") + } + inProgressResponse.isPaused = false + if len(extensions) > 0 { + _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { + for _, extension := range extensions { + rb.SendExtensionData(extension) + } + return nil + }) + } + rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1}) + select { + case rm.workSignal <- struct{}{}: + default: + } + return nil +} + +func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error { + key := responseKey{p, requestID} + rm.queryQueue.Remove(key, key.p) + response, ok := rm.inProgressResponses[key] + if !ok { + return errors.New("could not find request") + } + + if response.isPaused { + _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { + if isContextErr(err) { + + rm.cancelledListeners.NotifyCancelledListeners(p, response.request) + rb.ClearRequest() + } else if err == errNetworkError { + rb.ClearRequest() + } else { + rb.FinishWithError(graphsync.RequestCancelled) + rb.AddNotifee(notifications.Notifee{Data: graphsync.RequestCancelled, Subscriber: response.subscriber}) + } + return nil + }) + delete(rm.inProgressResponses, key) + response.cancelFn() + return nil + } + select { + case response.signals.ErrSignal <- err: + default: + } + return nil +} + +func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) { + for _, request := range requests { + key := responseKey{p: p, requestID: request.ID()} + if request.IsCancel() { + _ = rm.abortRequest(p, request.ID(), ipldutil.ContextCancelError{}) + continue + } + if request.IsUpdate() { + rm.processUpdate(key, request) + continue + } + rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request) + ctx, cancelFn := context.WithCancel(rm.ctx) + sub := notifications.NewTopicDataSubscriber(&subscriber{ + p: key.p, + request: request, + ctx: rm.ctx, + messages: rm.messages, + blockSentListeners: rm.blockSentListeners, + completedListeners: rm.completedListeners, + networkErrorListeners: rm.networkErrorListeners, + }) + + rm.inProgressResponses[key] = + &inProgressResponseStatus{ + ctx: ctx, + cancelFn: cancelFn, + subscriber: sub, + request: request, + signals: ResponseSignals{ + PauseSignal: make(chan struct{}, 1), + UpdateSignal: make(chan struct{}, 1), + ErrSignal: make(chan error, 1), + }, + } + // TODO: Use a better work estimation metric. + + rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) + + select { + case rm.workSignal <- struct{}{}: + default: + } + } +} + +func (rm *ResponseManager) taskDataForKey(key responseKey) ResponseTaskData { + response, hasResponse := rm.inProgressResponses[key] + if !hasResponse { + return ResponseTaskData{Empty: true} + } + if response.loader == nil || response.traverser == nil { + loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.linkSystem}).prepareQuery(response.ctx, key.p, response.request, response.signals, response.subscriber) + if err != nil { + response.cancelFn() + delete(rm.inProgressResponses, key) + return ResponseTaskData{Empty: true} + } + response.loader = loader + response.traverser = traverser + if isPaused { + response.isPaused = true + return ResponseTaskData{Empty: true} + } + } + return ResponseTaskData{false, response.subscriber, response.ctx, response.request, response.loader, response.traverser, response.signals} +} + +func (rm *ResponseManager) startTask(task *peertask.Task) ResponseTaskData { + key := task.Topic.(responseKey) + taskData := rm.taskDataForKey(key) + if taskData.Empty { + rm.queryQueue.TasksDone(key.p, task) + } + return taskData +} + +func (rm *ResponseManager) finishTask(task *peertask.Task, err error) { + key := task.Topic.(responseKey) + rm.queryQueue.TasksDone(key.p, task) + response, ok := rm.inProgressResponses[key] + if !ok { + return + } + if _, ok := err.(hooks.ErrPaused); ok { + response.isPaused = true + return + } + if err != nil { + log.Infof("response failed: %w", err) + } + delete(rm.inProgressResponses, key) + response.cancelFn() +} + +func (rm *ResponseManager) getUpdates(key responseKey) []gsmsg.GraphSyncRequest { + response, ok := rm.inProgressResponses[key] + if !ok { + return nil + } + updates := response.updates + response.updates = nil + return updates +} + +func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID) error { + key := responseKey{p, requestID} + inProgressResponse, ok := rm.inProgressResponses[key] + if !ok { + return errors.New("could not find request") + } + if inProgressResponse.isPaused { + return errors.New("request is already paused") + } + select { + case inProgressResponse.signals.PauseSignal <- struct{}{}: + default: + } + return nil +}