Skip to content

Commit

Permalink
Permit multiple data subscriptions per original topic (#128)
Browse files Browse the repository at this point in the history
* modify transformable event logic to simply add data

* permit multiple data topics so sent hook gets calle for every block

* rename test to match method rename.

Co-authored-by: dirkmc <dirkmdev@gmail.com>

Co-authored-by: acruikshank <acruikshank@example.com>
Co-authored-by: dirkmc <dirkmdev@gmail.com>
  • Loading branch information
3 people authored Dec 10, 2020
1 parent 11d30c6 commit e9653de
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 110 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
81 changes: 81 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,87 @@ func TestUnixFSFetch(t *testing.T) {
require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't")
}

func TestGraphsyncBlockListeners(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

// register hooks to count blocks in various stages
blocksSent := 0
blocksOutgoing := 0
blocksIncoming := 0
responder.RegisterBlockSentListener(func(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) {
blocksSent++
})
requestor.RegisterIncomingBlockHook(func(p peer.ID, r graphsync.ResponseData, b graphsync.BlockData, h graphsync.IncomingBlockHookActions) {
blocksIncoming++
})
responder.RegisterOutgoingBlockHook(func(p peer.ID, r graphsync.RequestData, b graphsync.BlockData, h graphsync.OutgoingBlockHookActions) {
blocksOutgoing++
})

var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

// verify extension round trip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

// assert we get notified for all the blocks
require.Equal(t, blockChainLength, blocksOutgoing)
require.Equal(t, blockChainLength, blocksIncoming)
require.Equal(t, blockChainLength, blocksSent)
}


type gsTestData struct {
mn mocknet.Mocknet
ctx context.Context
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage),
}
mutator(mq.nextMessage)
for _, notifee := range notifees {
notifications.SubscribeOn(mq.eventPublisher, mq.nextMessageTopic, notifee)
notifications.SubscribeWithData(mq.eventPublisher, mq.nextMessageTopic, notifee)
}
return !mq.nextMessage.Empty()
}
Expand Down
51 changes: 51 additions & 0 deletions notifications/data_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package notifications

import "sync"

type TopicDataSubscriber struct {
idMapLk sync.RWMutex
data map[Topic][]TopicData
Subscriber
}

// NewTopicDataSubscriber produces a subscriber that will transform
// events and topics before passing them on to the given subscriber
func NewTopicDataSubscriber(sub Subscriber) *TopicDataSubscriber {
return &TopicDataSubscriber{
Subscriber: sub,
data: make(map[Topic][]TopicData),
}
}

func (m *TopicDataSubscriber) AddTopicData(id Topic, data TopicData) {
m.idMapLk.Lock()
m.data[id] = append(m.data[id], data)
m.idMapLk.Unlock()
}

func (m *TopicDataSubscriber) getData(id Topic) []TopicData {
m.idMapLk.RLock()
defer m.idMapLk.RUnlock()

data, ok := m.data[id]
if !ok {
return []TopicData{}
}
newData := make([]TopicData, len(data))
for i, d := range data {
newData[i] = d
}
return newData
}

func (m *TopicDataSubscriber) OnNext(topic Topic, ev Event) {
for _, data := range m.getData(topic) {
m.Subscriber.OnNext(data, ev)
}
}

func (m *TopicDataSubscriber) OnClose(topic Topic) {
for _, data := range m.getData(topic) {
m.Subscriber.OnClose(data)
}
}
47 changes: 0 additions & 47 deletions notifications/mappable.go

This file was deleted.

6 changes: 3 additions & 3 deletions notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

func TestSubscribeOn(t *testing.T) {
func TestSubscribeWithData(t *testing.T) {
ctx := context.Background()
testCases := map[string]func(ctx context.Context, t *testing.T, ps notifications.Publisher){
"SubscribeOn": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {
"SubscribeWithData": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {
destTopic := "t2"
notifee, verifier := testutil.NewTestNotifee(destTopic, 1)
notifications.SubscribeOn(ps, "t1", notifee)
notifications.SubscribeWithData(ps, "t1", notifee)
ps.Publish("t1", "hi")
ps.Shutdown()
verifier.ExpectEvents(ctx, t, []notifications.Event{"hi"})
Expand Down
30 changes: 11 additions & 19 deletions notifications/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@ type Topic interface{}
// Event is a publishable event
type Event interface{}

// TopicData is data added to every message broadcast on a topic
type TopicData interface{}

// Subscriber is a subscriber that can receive events
type Subscriber interface {
OnNext(Topic, Event)
OnClose(Topic)
}

// MappableSubscriber is a subscriber that remaps events received to other topics
// and events
type MappableSubscriber interface {
Subscriber
Map(sourceID Topic, destinationID Topic)
}

// Subscribable is a stream that can be subscribed to
type Subscribable interface {
Subscribe(topic Topic, sub Subscriber) bool
Expand All @@ -37,20 +33,16 @@ type Publisher interface {
// EventTransform if a fucntion transforms one kind of event to another
type EventTransform func(Event) Event

// Notifee is a mappable suscriber where you want events to appear
// on this specified topic (used to call SubscribeOn to setup a remapping)
// Notifee is a topic data subscriber plus a set of data you want to add to any topics subscribed to
// (used to call SubscribeWithData to inject data when events for a given topic emit)
type Notifee struct {
Topic Topic
Subscriber MappableSubscriber
Data TopicData
Subscriber *TopicDataSubscriber
}

// SubscribeOn subscribes to the given subscribe on the given topic, but
// maps to a differnt topic specified in a notifee which has a mappable
// subscriber
func SubscribeOn(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.Map(topic, notifee.Topic)
// SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifies
// custom data into the list of data injected into callbacks when events occur on that topic
func SubscribeWithData(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.AddTopicData(topic, notifee.Data)
p.Subscribe(topic, notifee.Subscriber)
}

// IdentityTransform sets up an event transform that makes no changes
func IdentityTransform(ev Event) Event { return ev }
16 changes: 8 additions & 8 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type peerResponseSender struct {
responseBuilders []*responsebuilder.ResponseBuilder
nextBuilderTopic responsebuilder.Topic
queuedMessages chan responsebuilder.Topic
subscriber notifications.MappableSubscriber
allocatorSubscriber notifications.MappableSubscriber
subscriber *notifications.TopicDataSubscriber
allocatorSubscriber *notifications.TopicDataSubscriber
publisher notifications.Publisher
}

Expand Down Expand Up @@ -133,8 +133,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa
publisher: notifications.NewPublisher(),
allocator: allocator,
}
prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform)
prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform)
prs.subscriber = notifications.NewTopicDataSubscriber(&subscriber{prs})
prs.allocatorSubscriber = notifications.NewTopicDataSubscriber(&allocatorSubscriber{prs})
return prs
}

Expand Down Expand Up @@ -418,7 +418,7 @@ func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn fun
responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1]
buildResponseFn(responseBuilder)
for _, notifee := range notifees {
notifications.SubscribeOn(prs.publisher, responseBuilder.Topic(), notifee)
notifications.SubscribeWithData(prs.publisher, responseBuilder.Topic(), notifee)
}
return !responseBuilder.Empty()
}
Expand Down Expand Up @@ -466,8 +466,8 @@ func (prs *peerResponseSender) sendResponseMessages() {
if builder.Empty() {
continue
}
notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{
Topic: builder.BlockSize(),
notifications.SubscribeWithData(prs.publisher, builder.Topic(), notifications.Notifee{
Data: builder.BlockSize(),
Subscriber: prs.allocatorSubscriber,
})
responses, blks, err := builder.Build()
Expand All @@ -476,7 +476,7 @@ func (prs *peerResponseSender) sendResponseMessages() {
}

prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{
Topic: builder.Topic(),
Data: builder.Topic(),
Subscriber: prs.subscriber,
})

Expand Down
10 changes: 5 additions & 5 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData)

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub notifications.MappableSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
peerResponseSender := qe.peerManager.SenderForPeer(p)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: sub}
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
Expand Down Expand Up @@ -199,7 +199,7 @@ func (qe *queryExecutor) executeQuery(
loader ipld.Loader,
traverser ipldutil.Traverser,
signals signals,
sub notifications.MappableSubscriber) (graphsync.ResponseStatusCode, error) {
sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := qe.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
Expand All @@ -210,7 +210,7 @@ func (qe *queryExecutor) executeQuery(
return nil
}
blockData := transaction.SendResponse(link, data)
transaction.AddNotifee(notifications.Notifee{Topic: blockData, Subscriber: sub})
transaction.AddNotifee(notifications.Notifee{Data: blockData, Subscriber: sub})
if blockData.BlockSize() > 0 {
result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
for _, extension := range result.Extensions {
Expand Down Expand Up @@ -253,7 +253,7 @@ func (qe *queryExecutor) executeQuery(
} else {
code = peerResponseSender.FinishRequest()
}
peerResponseSender.AddNotifee(notifications.Notifee{Topic: code, Subscriber: sub})
peerResponseSender.AddNotifee(notifications.Notifee{Data: code, Subscriber: sub})
return nil
})
return code, err
Expand Down
Loading

0 comments on commit e9653de

Please sign in to comment.