Skip to content

Commit

Permalink
Track actual network operations in a response (#102)
Browse files Browse the repository at this point in the history
* feat(notifications): build notifications architecture

build an architecture via which we can pass on notifications

* feat(peerresponsesender): add notifications

pass notifications on from peer response sender based on message queue notifications

* feat(responsemanager): handle network errors

Pass on network errors through the response manager, pass block sent errors as well

* fix(notifications): send unique notifications by peer

* feat(responsemanager): subscriber per request

* Revert "fix(notifications): send unique notifications by peer"

This reverts commit 7bf368a.
  • Loading branch information
hannahhoward authored Oct 13, 2020
1 parent 70d3ec7 commit dd6fa45
Show file tree
Hide file tree
Showing 25 changed files with 1,962 additions and 924 deletions.
12 changes: 12 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// OnBlockSentListener runs when a block is sent over the wire
type OnBlockSentListener func(p peer.ID, request RequestData, block BlockData)

// OnNetworkErrorListener runs when queued data is not able to be sent
type OnNetworkErrorListener func(p peer.ID, request RequestData, err error)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

Expand Down Expand Up @@ -328,6 +334,12 @@ type GraphExchange interface {
// responses cancelled by the requestor
RegisterRequestorCancelledListener(listener OnRequestorCancelledListener) UnregisterHookFunc

// RegisterBlockSentListener adds a listener for when blocks are actually sent over the wire
RegisterBlockSentListener(listener OnBlockSentListener) UnregisterHookFunc

// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
RegisterNetworkErrorListener(listener OnNetworkErrorListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error
Expand Down
18 changes: 17 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type GraphSync struct {
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
blockSentListeners *responderhooks.BlockSentListeners
networkErrorListeners *responderhooks.NetworkErrorListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
Expand Down Expand Up @@ -91,7 +93,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
blockSentListeners := responderhooks.NewBlockSentListeners()
networkErrorListeners := responderhooks.NewNetworkErrorListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -106,6 +110,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
Expand Down Expand Up @@ -196,6 +202,16 @@ func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnReq
return gs.requestorCancelledListeners.Register(listener)
}

// RegisterBlockSentListener adds a listener for when blocks are actually sent over the wire
func (gs *GraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
return gs.blockSentListeners.Register(listener)
}

// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
func (gs *GraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
return gs.networkErrorListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
Expand Down
63 changes: 63 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,69 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestNetworkDisconnect(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

stopPoint := 50
blocksSent := 0
requestIDChan := make(chan graphsync.RequestID, 1)
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
select {
case requestIDChan <- requestData.ID():
default:
}
blocksSent++
if blocksSent == stopPoint {
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})
networkError := make(chan error, 1)
responder.RegisterNetworkErrorListener(func(p peer.ID, request graphsync.RequestData, err error) {
select {
case networkError <- err:
default:
}
})
requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second)
defer requestCancel()
progressChan, errChan := requestor.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)
testutil.AssertChannelEmpty(t, networkError, "no network errors so far")

// unlink peers so they cannot communicate
td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID())
td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID())
requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
require.NoError(t, err)

testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
77 changes: 50 additions & 27 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package messagequeue

import (
"context"
"errors"
"fmt"
"sync"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/notifications"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"

Expand All @@ -17,6 +20,21 @@ var log = logging.Logger("graphsync")

const maxRetries = 10

type EventName uint64

const (
Queued EventName = iota
Sent
Error
)

type Event struct {
Name EventName
Err error
}

type Topic uint64

// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
Expand All @@ -35,48 +53,50 @@ type MessageQueue struct {

// internal do not touch outside go routines
nextMessage gsmsg.GraphSyncMessage
nextMessageTopic Topic
nextMessageLk sync.RWMutex
nextAvailableTopic Topic
processedNotifiers []chan struct{}
sender gsnet.MessageSender
eventPublisher notifications.Publisher
}

// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
}
}

// AddRequest adds an outgoing request to the message queue.
func (mq *MessageQueue) AddRequest(graphSyncRequest gsmsg.GraphSyncRequest) {
func (mq *MessageQueue) AddRequest(graphSyncRequest gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {

if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
nextMessage.AddRequest(graphSyncRequest)
}, nil) {
}, notifees) {
mq.signalWork()
}
}

// AddResponses adds the given blocks and responses to the next message and
// returns a channel that sends a notification when sending initiates. If ignored by the consumer
// sending will not block.
func (mq *MessageQueue) AddResponses(responses []gsmsg.GraphSyncResponse, blks []blocks.Block) <-chan struct{} {
notificationChannel := make(chan struct{}, 1)
func (mq *MessageQueue) AddResponses(responses []gsmsg.GraphSyncResponse, blks []blocks.Block, notifees ...notifications.Notifee) {
if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
for _, response := range responses {
nextMessage.AddResponse(response)
}
for _, block := range blks {
nextMessage.AddBlock(block)
}
}, notificationChannel) {
}, notifees) {
mq.signalWork()
}
return notificationChannel
}

// Startup starts the processing of messages, and creates an initial message
Expand Down Expand Up @@ -109,15 +129,17 @@ func (mq *MessageQueue) runQueue() {
}
}

func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage), processedNotifier chan struct{}) bool {
func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage), notifees []notifications.Notifee) bool {
mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()
if mq.nextMessage == nil {
mq.nextMessage = gsmsg.New()
mq.nextMessageTopic = mq.nextAvailableTopic
mq.nextAvailableTopic++
}
mutator(mq.nextMessage)
if processedNotifier != nil {
mq.processedNotifiers = append(mq.processedNotifiers, processedNotifier)
for _, notifee := range notifees {
notifications.SubscribeOn(mq.eventPublisher, mq.nextMessageTopic, notifee)
}
return !mq.nextMessage.Empty()
}
Expand All @@ -129,41 +151,38 @@ func (mq *MessageQueue) signalWork() {
}
}

func (mq *MessageQueue) extractOutgoingMessage() gsmsg.GraphSyncMessage {
func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, Topic) {
// grab outgoing message
mq.nextMessageLk.Lock()
message := mq.nextMessage
topic := mq.nextMessageTopic
mq.nextMessage = nil
for _, processedNotifier := range mq.processedNotifiers {
select {
case processedNotifier <- struct{}{}:
default:
}
close(processedNotifier)
}
mq.processedNotifiers = nil
mq.nextMessageLk.Unlock()
return message
return message, topic
}

func (mq *MessageQueue) sendMessage() {
message := mq.extractOutgoingMessage()
message, topic := mq.extractOutgoingMessage()
if message == nil || message.Empty() {
return
}
mq.eventPublisher.Publish(topic, Event{Name: Queued, Err: nil})
defer mq.eventPublisher.Close(topic)

err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("cant open message sender to peer %s: %w", mq.p, err)})
return
}

for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(message) {
if mq.attemptSendAndRecovery(message, topic) {
return
}
}
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("expended retries on SendMsg(%s)", mq.p)})
}

func (mq *MessageQueue) initializeSender() error {
Expand All @@ -178,9 +197,10 @@ func (mq *MessageQueue) initializeSender() error {
return nil
}

func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) bool {
func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage, topic Topic) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
mq.eventPublisher.Publish(topic, Event{Name: Sent})
return true
}

Expand All @@ -190,8 +210,10 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b

select {
case <-mq.done:
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: errors.New("queue shutdown")})
return true
case <-mq.ctx.Done():
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: errors.New("context cancelled")})
return true
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
Expand All @@ -205,6 +227,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("couldnt open sender again after SendMsg(%s) failed: %w", mq.p, err)})
return true
}

Expand Down
13 changes: 10 additions & 3 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ipfs/go-graphsync"
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/testutil"
)

Expand Down Expand Up @@ -158,13 +159,13 @@ func TestProcessingNotification(t *testing.T) {
}
status := graphsync.RequestCompletedFull
newMessage.AddResponse(gsmsg.NewResponse(responseID, status, extension))
processing := messageQueue.AddResponses(newMessage.Responses(), blks)
testutil.AssertChannelEmpty(t, processing, "processing notification sent while queue is shutdown")
expectedTopic := "testTopic"
notifee, verifier := testutil.NewTestNotifee(expectedTopic, 5)
messageQueue.AddResponses(newMessage.Responses(), blks, notifee)

// wait for send attempt
messageQueue.Startup()
waitGroup.Wait()
testutil.AssertDoesReceive(ctx, t, processing, "message was not processed")

var message gsmsg.GraphSyncMessage
testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")
Expand All @@ -178,6 +179,12 @@ func TestProcessingNotification(t *testing.T) {
require.Equal(t, status, firstResponse.Status())
require.True(t, found)
require.Equal(t, extension.Data, extensionData)

verifier.ExpectEvents(ctx, t, []notifications.Event{
Event{Name: Queued},
Event{Name: Sent},
})
verifier.ExpectClose(ctx, t)
}

func TestDedupingMessages(t *testing.T) {
Expand Down
Loading

0 comments on commit dd6fa45

Please sign in to comment.