Skip to content

Commit

Permalink
feat(responsemanager): handle network errors
Browse files Browse the repository at this point in the history
Pass on network errors through the response manager, pass block sent errors as well
  • Loading branch information
hannahhoward committed Oct 11, 2020
1 parent fbd5be2 commit 781f52d
Show file tree
Hide file tree
Showing 11 changed files with 1,170 additions and 836 deletions.
16 changes: 14 additions & 2 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,17 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// OnBlockSentListener runs when a block is sent over the wire
type OnBlockSentListener func(p peer.ID, requestID RequestID, block BlockData)

// OnNetworkErrorListener runs when queued data is not able to be sent
type OnNetworkErrorListener func(p peer.ID, requestID RequestID, err error)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)
type OnResponseCompletedListener func(p peer.ID, requestID RequestID, status ResponseStatusCode)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)
type OnRequestorCancelledListener func(p peer.ID, requestID RequestID)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()
Expand Down Expand Up @@ -328,6 +334,12 @@ type GraphExchange interface {
// responses cancelled by the requestor
RegisterRequestorCancelledListener(listener OnRequestorCancelledListener) UnregisterHookFunc

// RegisterBlockSentListener adds a listener for when blocks are actually sent over the wire
RegisterBlockSentListener(listener OnBlockSentListener) UnregisterHookFunc

// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
RegisterNetworkErrorListener(listener OnNetworkErrorListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error
Expand Down
18 changes: 17 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type GraphSync struct {
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
blockSentListeners *responderhooks.BlockSentListeners
networkErrorListeners *responderhooks.NetworkErrorListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
Expand Down Expand Up @@ -91,7 +93,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
blockSentListeners := responderhooks.NewBlockSentListeners()
networkErrorListeners := responderhooks.NewNetworkErrorListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -106,6 +110,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
Expand Down Expand Up @@ -196,6 +202,16 @@ func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnReq
return gs.requestorCancelledListeners.Register(listener)
}

// RegisterBlockSentListener adds a listener for when blocks are actually sent over the wire
func (gs *GraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
return gs.blockSentListeners.Register(listener)
}

// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
func (gs *GraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
return gs.networkErrorListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
Expand Down
67 changes: 65 additions & 2 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestID, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
responder := td.GraphSyncHost2()

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestID, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
Expand Down Expand Up @@ -548,6 +548,69 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestNetworkDisconnect(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

stopPoint := 50
blocksSent := 0
requestIDChan := make(chan graphsync.RequestID, 1)
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
select {
case requestIDChan <- requestData.ID():
default:
}
blocksSent++
if blocksSent == stopPoint {
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})
networkError := make(chan error, 1)
responder.RegisterNetworkErrorListener(func(p peer.ID, requestID graphsync.RequestID, err error) {
select {
case networkError <- err:
default:
}
})
requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second)
defer requestCancel()
progressChan, errChan := requestor.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)
testutil.AssertChannelEmpty(t, networkError, "no network errors so far")

// unlink peers so they cannot communicate
td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID())
td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID())
requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
require.NoError(t, err)

testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
88 changes: 77 additions & 11 deletions responsemanager/hooks/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ type CompletedResponseListeners struct {
}

type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
p peer.ID
requestID graphsync.RequestID
status graphsync.ResponseStatusCode
}

func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
listener(ie.p, ie.requestID, ie.status)
return nil
}

Expand All @@ -36,8 +36,8 @@ func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCom
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, requestID, status})
}

// RequestorCancelledListeners is a set of listeners for when requestors cancel
Expand All @@ -46,14 +46,14 @@ type RequestorCancelledListeners struct {
}

type internalRequestorCancelledEvent struct {
p peer.ID
request graphsync.RequestData
p peer.ID
requestID graphsync.RequestID
}

func requestorCancelledDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestorCancelledEvent)
listener := subscriberFn.(graphsync.OnRequestorCancelledListener)
listener(ie.p, ie.request)
listener(ie.p, ie.requestID)
return nil
}

Expand All @@ -68,6 +68,72 @@ func (rcl *RequestorCancelledListeners) Register(listener graphsync.OnRequestorC
}

// NotifyCancelledListeners notifies all listeners that a requestor cancelled a response
func (rcl *RequestorCancelledListeners) NotifyCancelledListeners(p peer.ID, request graphsync.RequestData) {
_ = rcl.pubSub.Publish(internalRequestorCancelledEvent{p, request})
func (rcl *RequestorCancelledListeners) NotifyCancelledListeners(p peer.ID, requestID graphsync.RequestID) {
_ = rcl.pubSub.Publish(internalRequestorCancelledEvent{p, requestID})
}

// BlockSentListeners is a set of listeners for when requestors cancel
type BlockSentListeners struct {
pubSub *pubsub.PubSub
}

type internalBlockSentEvent struct {
p peer.ID
requestID graphsync.RequestID
block graphsync.BlockData
}

func blockSentDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockSentEvent)
listener := subscriberFn.(graphsync.OnBlockSentListener)
listener(ie.p, ie.requestID, ie.block)
return nil
}

// NewBlockSentListeners returns a new list of listeners for when requestors cancel
func NewBlockSentListeners() *BlockSentListeners {
return &BlockSentListeners{pubSub: pubsub.New(blockSentDispatcher)}
}

// Register registers an listener for completed responses
func (bsl *BlockSentListeners) Register(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(bsl.pubSub.Subscribe(listener))
}

// NotifyBlockSentListeners notifies all listeners that a requestor cancelled a response
func (bsl *BlockSentListeners) NotifyBlockSentListeners(p peer.ID, requestID graphsync.RequestID, block graphsync.BlockData) {
_ = bsl.pubSub.Publish(internalBlockSentEvent{p, requestID, block})
}

// NetworkErrorListeners is a set of listeners for when requestors cancel
type NetworkErrorListeners struct {
pubSub *pubsub.PubSub
}

type internalNetworkErrorEvent struct {
p peer.ID
requestID graphsync.RequestID
err error
}

func networkErrorDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalNetworkErrorEvent)
listener := subscriberFn.(graphsync.OnNetworkErrorListener)
listener(ie.p, ie.requestID, ie.err)
return nil
}

// NewNetworkErrorListeners returns a new list of listeners for when requestors cancel
func NewNetworkErrorListeners() *NetworkErrorListeners {
return &NetworkErrorListeners{pubSub: pubsub.New(networkErrorDispatcher)}
}

// Register registers an listener for completed responses
func (nel *NetworkErrorListeners) Register(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(nel.pubSub.Subscribe(listener))
}

// NotifyNetworkErrorListeners notifies all listeners that a requestor cancelled a response
func (nel *NetworkErrorListeners) NotifyNetworkErrorListeners(p peer.ID, requestID graphsync.RequestID, err error) {
_ = nel.pubSub.Publish(internalNetworkErrorEvent{p, requestID, err})
}
12 changes: 9 additions & 3 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type PeerResponseSender interface {
FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode, notifees ...notifications.Notifee)
// Transaction calls multiple operations at once so they end up in a single response
// Note: if the transaction function errors, the results will not execute
Transaction(requestID graphsync.RequestID, transaction Transaction, notifees ...notifications.Notifee) error
Transaction(requestID graphsync.RequestID, transaction Transaction) error
PauseRequest(requestID graphsync.RequestID, notifees ...notifications.Notifee)
}

Expand All @@ -104,6 +104,7 @@ type PeerResponseTransactionSender interface {
FinishRequest() graphsync.ResponseStatusCode
FinishWithError(status graphsync.ResponseStatusCode)
PauseRequest()
AddNotifee(notifications.Notifee)
}

// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
Expand Down Expand Up @@ -202,6 +203,7 @@ func (prs *peerResponseSender) SendExtensionData(requestID graphsync.RequestID,
type peerResponseTransactionSender struct {
requestID graphsync.RequestID
operations []responseOperation
notifees []notifications.Notifee
prs *peerResponseSender
}

Expand Down Expand Up @@ -233,14 +235,18 @@ func (prts *peerResponseTransactionSender) FinishWithCancel() {
_ = prts.prs.finishTracking(prts.requestID)
}

func (prs *peerResponseSender) Transaction(requestID graphsync.RequestID, transaction Transaction, notifees ...notifications.Notifee) error {
func (prts *peerResponseTransactionSender) AddNotifee(notifee notifications.Notifee) {
prts.notifees = append(prts.notifees, notifee)
}

func (prs *peerResponseSender) Transaction(requestID graphsync.RequestID, transaction Transaction) error {
prts := &peerResponseTransactionSender{
requestID: requestID,
prs: prs,
}
err := transaction(prts)
if err == nil {
prs.execute(prts.operations, notifees)
prs.execute(prts.operations, prts.notifees)
}
return err
}
Expand Down
Loading

0 comments on commit 781f52d

Please sign in to comment.