From 0d1ece7aea19130e45062f9ce9c2f4556f3a17ad Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 9 Dec 2020 16:36:32 +0100 Subject: [PATCH 1/4] fix: RegisterNetworkErrorListener should fire when there's an error connecting to the peer --- impl/graphsync.go | 4 +-- impl/graphsync_test.go | 35 +++++++++++++++++++++++ requestmanager/executor/executor.go | 37 ++++++++++++++++++++----- requestmanager/requestmanager.go | 18 ++++++++---- requestmanager/requestmanager_test.go | 40 +++++++++++++++------------ 5 files changed, 101 insertions(+), 33 deletions(-) diff --git a/impl/graphsync.go b/impl/graphsync.go index 7eab4ecc..4b723527 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -113,7 +113,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingResponseHooks := requestorhooks.NewResponseHooks() outgoingRequestHooks := requestorhooks.NewRequestHooks() incomingBlockHooks := requestorhooks.NewBlockHooks() - requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks) + networkErrorListeners := responderhooks.NewNetworkErrorListeners() + requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners) peerTaskQueue := peertaskqueue.New() persistenceOptions := persistenceoptions.New() @@ -123,7 +124,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, completedResponseListeners := responderhooks.NewCompletedResponseListeners() requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners() blockSentListeners := responderhooks.NewBlockSentListeners() - networkErrorListeners := responderhooks.NewNetworkErrorListeners() unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 85db7cee..0a322e7c 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -611,6 +611,41 @@ func TestNetworkDisconnect(t *testing.T) { require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error()) } +func TestConnectFail(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + + blockChainLength := 100 + blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + + requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second) + defer requestCancel() + + // unlink peers so they cannot communicate + td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID()) + td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID()) + + reqNetworkError := make(chan error, 1) + requestor.RegisterNetworkErrorListener(func(p peer.ID, request graphsync.RequestData, err error) { + select { + case reqNetworkError <- err: + default: + } + }) + _, errChan := requestor.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension) + + var err error + testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error") + testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") + require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error()) +} + func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { // create network ctx := context.Background() diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index a223c5b7..74b9a01a 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -3,6 +3,8 @@ package executor import ( "bytes" "context" + "github.com/ipfs/go-graphsync/messagequeue" + "github.com/ipfs/go-graphsync/responsemanager" "strings" "sync/atomic" @@ -27,12 +29,13 @@ type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResu // ExecutionEnv are request parameters that last between requests type ExecutionEnv struct { - Ctx context.Context - SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee) - RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error - TerminateRequest func(graphsync.RequestID) - WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error) - Loader AsyncLoadFn + Ctx context.Context + SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee) + RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error + TerminateRequest func(graphsync.RequestID) + WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error) + Loader AsyncLoadFn + NetworkErrorListeners responsemanager.NetworkErrorListeners } // RequestExecution are parameters for a single request execution @@ -166,8 +169,28 @@ func (re *requestExecutor) run() { close(re.inProgressErr) } +type reqSubscriber struct { + re *requestExecutor +} + +func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + mqEvt, isMQEvt := event.(messagequeue.Event) + if !isMQEvt || mqEvt.Name != messagequeue.Error { + return + } + + r.re.env.NetworkErrorListeners.NotifyNetworkErrorListeners(r.re.p, r.re.request, mqEvt.Err) + //r.re.networkError <- mqEvt.Err + //r.re.terminateRequest() +} + +func (r reqSubscriber) OnClose(topic notifications.Topic) { +} + func (re *requestExecutor) sendRequest(request gsmsg.GraphSyncRequest) { - re.env.SendRequest(re.p, request) + sub := notifications.NewMappableSubscriber(&reqSubscriber{re}, notifications.IdentityTransform) + failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub} + re.env.SendRequest(re.p, request, failNotifee) } func (re *requestExecutor) terminateRequest() { diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index ed82d7ad..cd76ca70 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/ipfs/go-graphsync/responsemanager" "sync/atomic" blocks "github.com/ipfs/go-block-format" @@ -74,6 +75,7 @@ type RequestManager struct { requestHooks RequestHooks responseHooks ResponseHooks blockHooks BlockHooks + networkErrorListeners responsemanager.NetworkErrorListeners } type requestManagerMessage interface { @@ -100,7 +102,9 @@ func New(ctx context.Context, asyncLoader AsyncLoader, requestHooks RequestHooks, responseHooks ResponseHooks, - blockHooks BlockHooks) *RequestManager { + blockHooks BlockHooks, + networkErrorListeners responsemanager.NetworkErrorListeners, +) *RequestManager { ctx, cancel := context.WithCancel(ctx) return &RequestManager{ ctx: ctx, @@ -112,6 +116,7 @@ func New(ctx context.Context, requestHooks: requestHooks, responseHooks: responseHooks, blockHooks: blockHooks, + networkErrorListeners: networkErrorListeners, } } @@ -332,11 +337,12 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) rm.inProgressRequestStatuses[request.ID()] = requestStatus incoming, incomingError := executor.ExecutionEnv{ - Ctx: rm.ctx, - SendRequest: rm.peerHandler.SendRequest, - TerminateRequest: rm.terminateRequest, - RunBlockHooks: rm.processBlockHooks, - Loader: rm.asyncLoader.AsyncLoad, + Ctx: rm.ctx, + SendRequest: rm.peerHandler.SendRequest, + TerminateRequest: rm.terminateRequest, + RunBlockHooks: rm.processBlockHooks, + Loader: rm.asyncLoader.AsyncLoad, + NetworkErrorListeners: rm.networkErrorListeners, }.Start( executor.RequestExecution{ Ctx: ctx, diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 12eb8df9..10745800 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/ipfs/go-graphsync/responsemanager" + responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks" "testing" "time" @@ -858,23 +860,24 @@ func TestPauseResumeExternal(t *testing.T) { } type testData struct { - requestRecordChan chan requestRecord - fph *fakePeerHandler - fal *testloader.FakeAsyncLoader - requestHooks *hooks.OutgoingRequestHooks - responseHooks *hooks.IncomingResponseHooks - blockHooks *hooks.IncomingBlockHooks - requestManager *RequestManager - blockStore map[ipld.Link][]byte - loader ipld.Loader - storer ipld.Storer - blockChain *testutil.TestBlockChain - extensionName1 graphsync.ExtensionName - extensionData1 []byte - extension1 graphsync.ExtensionData - extensionName2 graphsync.ExtensionName - extensionData2 []byte - extension2 graphsync.ExtensionData + requestRecordChan chan requestRecord + fph *fakePeerHandler + fal *testloader.FakeAsyncLoader + requestHooks *hooks.OutgoingRequestHooks + responseHooks *hooks.IncomingResponseHooks + blockHooks *hooks.IncomingBlockHooks + requestManager *RequestManager + blockStore map[ipld.Link][]byte + loader ipld.Loader + storer ipld.Storer + blockChain *testutil.TestBlockChain + extensionName1 graphsync.ExtensionName + extensionData1 []byte + extension1 graphsync.ExtensionData + extensionName2 graphsync.ExtensionName + extensionData2 []byte + extension2 graphsync.ExtensionData + networkErrorListeners responsemanager.NetworkErrorListeners } func newTestData(ctx context.Context, t *testing.T) *testData { @@ -885,7 +888,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData { td.requestHooks = hooks.NewRequestHooks() td.responseHooks = hooks.NewResponseHooks() td.blockHooks = hooks.NewBlockHooks() - td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks) + td.networkErrorListeners = responderhooks.NewNetworkErrorListeners() + td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners) td.requestManager.SetDelegate(td.fph) td.requestManager.Startup() td.blockStore = make(map[ipld.Link][]byte) From 89fe35b4434c25d5552c8b1f3be9b7bb59540fb1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 9 Dec 2020 16:57:30 -0800 Subject: [PATCH 2/4] feat(listeners): move listeners to top level folder --- impl/graphsync.go | 17 +++++++++-------- .../hooks => listeners}/listeners.go | 2 +- requestmanager/requestmanager_test.go | 7 ++++--- responsemanager/responsemanager_test.go | 17 +++++++++-------- 4 files changed, 23 insertions(+), 20 deletions(-) rename {responsemanager/hooks => listeners}/listeners.go (99%) diff --git a/impl/graphsync.go b/impl/graphsync.go index 4b723527..16084544 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" gsnet "github.com/ipfs/go-graphsync/network" @@ -46,10 +47,10 @@ type GraphSync struct { incomingRequestHooks *responderhooks.IncomingRequestHooks outgoingBlockHooks *responderhooks.OutgoingBlockHooks requestUpdatedHooks *responderhooks.RequestUpdatedHooks - completedResponseListeners *responderhooks.CompletedResponseListeners - requestorCancelledListeners *responderhooks.RequestorCancelledListeners - blockSentListeners *responderhooks.BlockSentListeners - networkErrorListeners *responderhooks.NetworkErrorListeners + completedResponseListeners *listeners.CompletedResponseListeners + requestorCancelledListeners *listeners.RequestorCancelledListeners + blockSentListeners *listeners.BlockSentListeners + networkErrorListeners *listeners.NetworkErrorListeners incomingResponseHooks *requestorhooks.IncomingResponseHooks outgoingRequestHooks *requestorhooks.OutgoingRequestHooks incomingBlockHooks *requestorhooks.IncomingBlockHooks @@ -113,7 +114,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingResponseHooks := requestorhooks.NewResponseHooks() outgoingRequestHooks := requestorhooks.NewRequestHooks() incomingBlockHooks := requestorhooks.NewBlockHooks() - networkErrorListeners := responderhooks.NewNetworkErrorListeners() + networkErrorListeners := listeners.NewNetworkErrorListeners() requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners) peerTaskQueue := peertaskqueue.New() @@ -121,9 +122,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() requestUpdatedHooks := responderhooks.NewUpdateHooks() - completedResponseListeners := responderhooks.NewCompletedResponseListeners() - requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners() - blockSentListeners := responderhooks.NewBlockSentListeners() + completedResponseListeners := listeners.NewCompletedResponseListeners() + requestorCancelledListeners := listeners.NewRequestorCancelledListeners() + blockSentListeners := listeners.NewBlockSentListeners() unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, diff --git a/responsemanager/hooks/listeners.go b/listeners/listeners.go similarity index 99% rename from responsemanager/hooks/listeners.go rename to listeners/listeners.go index f038f483..005e6d99 100644 --- a/responsemanager/hooks/listeners.go +++ b/listeners/listeners.go @@ -1,4 +1,4 @@ -package hooks +package listeners import ( "github.com/hannahhoward/go-pubsub" diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 10745800..01a2fb1e 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" - "github.com/ipfs/go-graphsync/responsemanager" - responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks" "testing" "time" + "github.com/ipfs/go-graphsync/listeners" + "github.com/ipfs/go-graphsync/responsemanager" + blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -888,7 +889,7 @@ func newTestData(ctx context.Context, t *testing.T) *testData { td.requestHooks = hooks.NewRequestHooks() td.responseHooks = hooks.NewResponseHooks() td.blockHooks = hooks.NewBlockHooks() - td.networkErrorListeners = responderhooks.NewNetworkErrorListeners() + td.networkErrorListeners = listeners.NewNetworkErrorListeners() td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners) td.requestManager.SetDelegate(td.fph) td.requestManager.Startup() diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 8fa025cf..985fa40f 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/responsemanager/hooks" @@ -886,10 +887,10 @@ type testData struct { requestHooks *hooks.IncomingRequestHooks blockHooks *hooks.OutgoingBlockHooks updateHooks *hooks.RequestUpdatedHooks - completedListeners *hooks.CompletedResponseListeners - cancelledListeners *hooks.RequestorCancelledListeners - blockSentListeners *hooks.BlockSentListeners - networkErrorListeners *hooks.NetworkErrorListeners + completedListeners *listeners.CompletedResponseListeners + cancelledListeners *listeners.RequestorCancelledListeners + blockSentListeners *listeners.BlockSentListeners + networkErrorListeners *listeners.NetworkErrorListeners notifeePublisher *testutil.MockPublisher blockSends chan graphsync.BlockData completedResponseStatuses chan graphsync.ResponseStatusCode @@ -960,10 +961,10 @@ func newTestData(t *testing.T) testData { td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions) td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() - td.completedListeners = hooks.NewCompletedResponseListeners() - td.cancelledListeners = hooks.NewRequestorCancelledListeners() - td.blockSentListeners = hooks.NewBlockSentListeners() - td.networkErrorListeners = hooks.NewNetworkErrorListeners() + td.completedListeners = listeners.NewCompletedResponseListeners() + td.cancelledListeners = listeners.NewRequestorCancelledListeners() + td.blockSentListeners = listeners.NewBlockSentListeners() + td.networkErrorListeners = listeners.NewNetworkErrorListeners() td.completedListeners.Register(func(p peer.ID, requestID graphsync.RequestData, status graphsync.ResponseStatusCode) { select { case td.completedResponseStatuses <- status: From 4ac76cf75c9b0278f37ec35a063a8444a8d42cad Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 9 Dec 2020 17:11:28 -0800 Subject: [PATCH 3/4] feat(requestmanager): put network error in request manager Put network error in request manager to cover more cases --- requestmanager/executor/executor.go | 38 +++------------- requestmanager/executor/executor_test.go | 3 +- requestmanager/requestmanager.go | 55 ++++++++++++++++++------ requestmanager/requestmanager_test.go | 3 +- 4 files changed, 50 insertions(+), 49 deletions(-) diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 74b9a01a..bef1274e 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -3,8 +3,6 @@ package executor import ( "bytes" "context" - "github.com/ipfs/go-graphsync/messagequeue" - "github.com/ipfs/go-graphsync/responsemanager" "strings" "sync/atomic" @@ -18,7 +16,6 @@ import ( "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/requestmanager/types" ) @@ -29,13 +26,12 @@ type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResu // ExecutionEnv are request parameters that last between requests type ExecutionEnv struct { - Ctx context.Context - SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee) - RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error - TerminateRequest func(graphsync.RequestID) - WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error) - Loader AsyncLoadFn - NetworkErrorListeners responsemanager.NetworkErrorListeners + Ctx context.Context + SendRequest func(peer.ID, gsmsg.GraphSyncRequest) + RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error + TerminateRequest func(graphsync.RequestID) + WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error) + Loader AsyncLoadFn } // RequestExecution are parameters for a single request execution @@ -169,28 +165,8 @@ func (re *requestExecutor) run() { close(re.inProgressErr) } -type reqSubscriber struct { - re *requestExecutor -} - -func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { - mqEvt, isMQEvt := event.(messagequeue.Event) - if !isMQEvt || mqEvt.Name != messagequeue.Error { - return - } - - r.re.env.NetworkErrorListeners.NotifyNetworkErrorListeners(r.re.p, r.re.request, mqEvt.Err) - //r.re.networkError <- mqEvt.Err - //r.re.terminateRequest() -} - -func (r reqSubscriber) OnClose(topic notifications.Topic) { -} - func (re *requestExecutor) sendRequest(request gsmsg.GraphSyncRequest) { - sub := notifications.NewMappableSubscriber(&reqSubscriber{re}, notifications.IdentityTransform) - failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub} - re.env.SendRequest(re.p, request, failNotifee) + re.env.SendRequest(re.p, request) } func (re *requestExecutor) terminateRequest() { diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 4302b75a..a05ab9f8 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/requestmanager/executor" "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/requestmanager/testloader" @@ -378,7 +377,7 @@ func (ree *requestExecutionEnv) waitForResume() ([]graphsync.ExtensionData, erro return extensions, nil } -func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) { +func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { ree.requestsSent = append(ree.requestsSent, requestSent{p, request}) if ree.currentWaitForResumeResult < len(ree.loaderRanges) && !request.IsCancel() { ree.configureLoader(ree.p, ree.request.ID(), ree.tbc, ree.fal, ree.loaderRanges[ree.currentWaitForResumeResult]) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index cd76ca70..5bebb6d1 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" - "github.com/ipfs/go-graphsync/responsemanager" "sync/atomic" + "github.com/ipfs/go-graphsync/listeners" + "github.com/ipfs/go-graphsync/messagequeue" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" @@ -75,7 +77,7 @@ type RequestManager struct { requestHooks RequestHooks responseHooks ResponseHooks blockHooks BlockHooks - networkErrorListeners responsemanager.NetworkErrorListeners + networkErrorListeners *listeners.NetworkErrorListeners } type requestManagerMessage interface { @@ -103,7 +105,7 @@ func New(ctx context.Context, requestHooks RequestHooks, responseHooks ResponseHooks, blockHooks BlockHooks, - networkErrorListeners responsemanager.NetworkErrorListeners, + networkErrorListeners *listeners.NetworkErrorListeners, ) *RequestManager { ctx, cancel := context.WithCancel(ctx) return &RequestManager{ @@ -337,12 +339,11 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) rm.inProgressRequestStatuses[request.ID()] = requestStatus incoming, incomingError := executor.ExecutionEnv{ - Ctx: rm.ctx, - SendRequest: rm.peerHandler.SendRequest, - TerminateRequest: rm.terminateRequest, - RunBlockHooks: rm.processBlockHooks, - Loader: rm.asyncLoader.AsyncLoad, - NetworkErrorListeners: rm.networkErrorListeners, + Ctx: rm.ctx, + SendRequest: rm.sendRequest, + TerminateRequest: rm.terminateRequest, + RunBlockHooks: rm.processBlockHooks, + Loader: rm.asyncLoader.AsyncLoad, }.Start( executor.RequestExecution{ Ctx: ctx, @@ -381,7 +382,7 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) { return } - rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) + rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) if crm.isPause { inProgressRequestStatus.paused = true } else { @@ -431,7 +432,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg result := rm.responseHooks.ProcessResponseHooks(p, response) if len(result.Extensions) > 0 { updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) - rm.peerHandler.SendRequest(p, updateRequest) + rm.sendRequest(p, updateRequest) } if result.Err != nil { requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()] @@ -443,7 +444,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg case requestStatus.networkError <- responseError: case <-requestStatus.ctx.Done(): } - rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID())) + rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID())) requestStatus.cancelFn() return false } @@ -488,7 +489,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon result := rm.blockHooks.ProcessBlockHooks(p, response, block) if len(result.Extensions) > 0 { updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...) - rm.peerHandler.SendRequest(p, updateRequest) + rm.sendRequest(p, updateRequest) } if result.Err != nil { _, isPause := result.Err.(hooks.ErrPaused) @@ -541,6 +542,32 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer. return request, hooksResult, nil } +type reqSubscriber struct { + p peer.ID + request gsmsg.GraphSyncRequest + networkErrorListeners *listeners.NetworkErrorListeners +} + +func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + mqEvt, isMQEvt := event.(messagequeue.Event) + if !isMQEvt || mqEvt.Name != messagequeue.Error { + return + } + + r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err) + //r.re.networkError <- mqEvt.Err + //r.re.terminateRequest() +} + +func (r reqSubscriber) OnClose(topic notifications.Topic) { +} + +func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { + sub := notifications.NewMappableSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}, notifications.IdentityTransform) + failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub} + rm.peerHandler.SendRequest(p, request, failNotifee) +} + func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error { inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id] if !ok { @@ -552,7 +579,7 @@ func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error { inProgressRequestStatus.paused = false select { case <-inProgressRequestStatus.pauseMessages: - rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...)) + rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...)) return nil case <-rm.ctx.Done(): return errors.New("context cancelled") diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 01a2fb1e..715ccc80 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/ipfs/go-graphsync/listeners" - "github.com/ipfs/go-graphsync/responsemanager" blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" @@ -878,7 +877,7 @@ type testData struct { extensionName2 graphsync.ExtensionName extensionData2 []byte extension2 graphsync.ExtensionData - networkErrorListeners responsemanager.NetworkErrorListeners + networkErrorListeners *listeners.NetworkErrorListeners } func newTestData(ctx context.Context, t *testing.T) *testData { From d063643709c3710d654e4aada1c6360860e9a697 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 10 Dec 2020 13:52:28 -0800 Subject: [PATCH 4/4] feat(requestmanager): correct for notification refactor Resolve discripencies with new and old notification interfaces --- requestmanager/requestmanager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 5bebb6d1..f5844af1 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -562,9 +562,11 @@ func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Ev func (r reqSubscriber) OnClose(topic notifications.Topic) { } +const requestNetworkError = "request_network_error" + func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { - sub := notifications.NewMappableSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}, notifications.IdentityTransform) - failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub} + sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}) + failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub} rm.peerHandler.SendRequest(p, request, failNotifee) }