diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 74b9a01a..bef1274e 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -3,8 +3,6 @@ package executor import ( "bytes" "context" - "github.com/ipfs/go-graphsync/messagequeue" - "github.com/ipfs/go-graphsync/responsemanager" "strings" "sync/atomic" @@ -18,7 +16,6 @@ import ( "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/requestmanager/types" ) @@ -29,13 +26,12 @@ type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResu // ExecutionEnv are request parameters that last between requests type ExecutionEnv struct { - Ctx context.Context - SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee) - RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error - TerminateRequest func(graphsync.RequestID) - WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error) - Loader AsyncLoadFn - NetworkErrorListeners responsemanager.NetworkErrorListeners + Ctx context.Context + SendRequest func(peer.ID, gsmsg.GraphSyncRequest) + RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error + TerminateRequest func(graphsync.RequestID) + WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error) + Loader AsyncLoadFn } // RequestExecution are parameters for a single request execution @@ -169,28 +165,8 @@ func (re *requestExecutor) run() { close(re.inProgressErr) } -type reqSubscriber struct { - re *requestExecutor -} - -func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { - mqEvt, isMQEvt := event.(messagequeue.Event) - if !isMQEvt || mqEvt.Name != messagequeue.Error { - return - } - - r.re.env.NetworkErrorListeners.NotifyNetworkErrorListeners(r.re.p, r.re.request, mqEvt.Err) - //r.re.networkError <- mqEvt.Err - //r.re.terminateRequest() -} - -func (r reqSubscriber) OnClose(topic notifications.Topic) { -} - func (re *requestExecutor) sendRequest(request gsmsg.GraphSyncRequest) { - sub := notifications.NewMappableSubscriber(&reqSubscriber{re}, notifications.IdentityTransform) - failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub} - re.env.SendRequest(re.p, request, failNotifee) + re.env.SendRequest(re.p, request) } func (re *requestExecutor) terminateRequest() { diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 4302b75a..a05ab9f8 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/requestmanager/executor" "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/requestmanager/testloader" @@ -378,7 +377,7 @@ func (ree *requestExecutionEnv) waitForResume() ([]graphsync.ExtensionData, erro return extensions, nil } -func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) { +func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { ree.requestsSent = append(ree.requestsSent, requestSent{p, request}) if ree.currentWaitForResumeResult < len(ree.loaderRanges) && !request.IsCancel() { ree.configureLoader(ree.p, ree.request.ID(), ree.tbc, ree.fal, ree.loaderRanges[ree.currentWaitForResumeResult]) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index cd76ca70..5bebb6d1 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" - "github.com/ipfs/go-graphsync/responsemanager" "sync/atomic" + "github.com/ipfs/go-graphsync/listeners" + "github.com/ipfs/go-graphsync/messagequeue" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" @@ -75,7 +77,7 @@ type RequestManager struct { requestHooks RequestHooks responseHooks ResponseHooks blockHooks BlockHooks - networkErrorListeners responsemanager.NetworkErrorListeners + networkErrorListeners *listeners.NetworkErrorListeners } type requestManagerMessage interface { @@ -103,7 +105,7 @@ func New(ctx context.Context, requestHooks RequestHooks, responseHooks ResponseHooks, blockHooks BlockHooks, - networkErrorListeners responsemanager.NetworkErrorListeners, + networkErrorListeners *listeners.NetworkErrorListeners, ) *RequestManager { ctx, cancel := context.WithCancel(ctx) return &RequestManager{ @@ -337,12 +339,11 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) rm.inProgressRequestStatuses[request.ID()] = requestStatus incoming, incomingError := executor.ExecutionEnv{ - Ctx: rm.ctx, - SendRequest: rm.peerHandler.SendRequest, - TerminateRequest: rm.terminateRequest, - RunBlockHooks: rm.processBlockHooks, - Loader: rm.asyncLoader.AsyncLoad, - NetworkErrorListeners: rm.networkErrorListeners, + Ctx: rm.ctx, + SendRequest: rm.sendRequest, + TerminateRequest: rm.terminateRequest, + RunBlockHooks: rm.processBlockHooks, + Loader: rm.asyncLoader.AsyncLoad, }.Start( executor.RequestExecution{ Ctx: ctx, @@ -381,7 +382,7 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) { return } - rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) + rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) if crm.isPause { inProgressRequestStatus.paused = true } else { @@ -431,7 +432,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg result := rm.responseHooks.ProcessResponseHooks(p, response) if len(result.Extensions) > 0 { updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) - rm.peerHandler.SendRequest(p, updateRequest) + rm.sendRequest(p, updateRequest) } if result.Err != nil { requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()] @@ -443,7 +444,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg case requestStatus.networkError <- responseError: case <-requestStatus.ctx.Done(): } - rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID())) + rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID())) requestStatus.cancelFn() return false } @@ -488,7 +489,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon result := rm.blockHooks.ProcessBlockHooks(p, response, block) if len(result.Extensions) > 0 { updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) - rm.peerHandler.SendRequest(p, updateRequest) + rm.sendRequest(p, updateRequest) } if result.Err != nil { _, isPause := result.Err.(hooks.ErrPaused) @@ -541,6 +542,32 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer. return request, hooksResult, nil } +type reqSubscriber struct { + p peer.ID + request gsmsg.GraphSyncRequest + networkErrorListeners *listeners.NetworkErrorListeners +} + +func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + mqEvt, isMQEvt := event.(messagequeue.Event) + if !isMQEvt || mqEvt.Name != messagequeue.Error { + return + } + + r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err) + //r.re.networkError <- mqEvt.Err + //r.re.terminateRequest() +} + +func (r reqSubscriber) OnClose(topic notifications.Topic) { +} + +func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { + sub := notifications.NewMappableSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}, notifications.IdentityTransform) + failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub} + rm.peerHandler.SendRequest(p, request, failNotifee) +} + func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error { inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id] if !ok { @@ -552,7 +579,7 @@ func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error { inProgressRequestStatus.paused = false select { case <-inProgressRequestStatus.pauseMessages: - rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...)) + rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...)) return nil case <-rm.ctx.Done(): return errors.New("context cancelled") diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 01a2fb1e..715ccc80 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/ipfs/go-graphsync/listeners" - "github.com/ipfs/go-graphsync/responsemanager" blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" @@ -878,7 +877,7 @@ type testData struct { extensionName2 graphsync.ExtensionName extensionData2 []byte extension2 graphsync.ExtensionData - networkErrorListeners responsemanager.NetworkErrorListeners + networkErrorListeners *listeners.NetworkErrorListeners } func newTestData(ctx context.Context, t *testing.T) *testData {