Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permit multiple data subscriptions per original topic #128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
81 changes: 81 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,87 @@ func TestUnixFSFetch(t *testing.T) {
require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't")
}

func TestGraphsyncBlockListeners(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

// register hooks to count blocks in various stages
blocksSent := 0
blocksOutgoing := 0
blocksIncoming := 0
responder.RegisterBlockSentListener(func(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) {
blocksSent++
})
requestor.RegisterIncomingBlockHook(func(p peer.ID, r graphsync.ResponseData, b graphsync.BlockData, h graphsync.IncomingBlockHookActions) {
blocksIncoming++
})
responder.RegisterOutgoingBlockHook(func(p peer.ID, r graphsync.RequestData, b graphsync.BlockData, h graphsync.OutgoingBlockHookActions) {
blocksOutgoing++
})

var receivedResponseData []byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
})

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

// verify extension round trip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

// assert we get notified for all the blocks
require.Equal(t, blockChainLength, blocksOutgoing)
require.Equal(t, blockChainLength, blocksIncoming)
require.Equal(t, blockChainLength, blocksSent)
}


type gsTestData struct {
mn mocknet.Mocknet
ctx context.Context
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage),
}
mutator(mq.nextMessage)
for _, notifee := range notifees {
notifications.SubscribeOn(mq.eventPublisher, mq.nextMessageTopic, notifee)
notifications.SubscribeWithData(mq.eventPublisher, mq.nextMessageTopic, notifee)
}
return !mq.nextMessage.Empty()
}
Expand Down
51 changes: 51 additions & 0 deletions notifications/data_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package notifications

import "sync"

type TopicDataSubscriber struct {
idMapLk sync.RWMutex
data map[Topic][]TopicData
Subscriber
}

// NewTopicDataSubscriber produces a subscriber that will transform
// events and topics before passing them on to the given subscriber
func NewTopicDataSubscriber(sub Subscriber) *TopicDataSubscriber {
return &TopicDataSubscriber{
Subscriber: sub,
data: make(map[Topic][]TopicData),
}
}

func (m *TopicDataSubscriber) AddTopicData(id Topic, data TopicData) {
m.idMapLk.Lock()
m.data[id] = append(m.data[id], data)
m.idMapLk.Unlock()
}

func (m *TopicDataSubscriber) getData(id Topic) []TopicData {
m.idMapLk.RLock()
defer m.idMapLk.RUnlock()

data, ok := m.data[id]
if !ok {
return []TopicData{}
}
newData := make([]TopicData, len(data))
for i, d := range data {
newData[i] = d
}
return newData
}

func (m *TopicDataSubscriber) OnNext(topic Topic, ev Event) {
for _, data := range m.getData(topic) {
m.Subscriber.OnNext(data, ev)
}
}

func (m *TopicDataSubscriber) OnClose(topic Topic) {
for _, data := range m.getData(topic) {
m.Subscriber.OnClose(data)
}
}
47 changes: 0 additions & 47 deletions notifications/mappable.go

This file was deleted.

6 changes: 3 additions & 3 deletions notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

func TestSubscribeOn(t *testing.T) {
func TestSubscribeWithData(t *testing.T) {
ctx := context.Background()
testCases := map[string]func(ctx context.Context, t *testing.T, ps notifications.Publisher){
"SubscribeOn": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {
"SubscribeWithData": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {
destTopic := "t2"
notifee, verifier := testutil.NewTestNotifee(destTopic, 1)
notifications.SubscribeOn(ps, "t1", notifee)
notifications.SubscribeWithData(ps, "t1", notifee)
ps.Publish("t1", "hi")
ps.Shutdown()
verifier.ExpectEvents(ctx, t, []notifications.Event{"hi"})
Expand Down
30 changes: 11 additions & 19 deletions notifications/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@ type Topic interface{}
// Event is a publishable event
type Event interface{}

// TopicData is data added to every message broadcast on a topic
type TopicData interface{}

// Subscriber is a subscriber that can receive events
type Subscriber interface {
OnNext(Topic, Event)
OnClose(Topic)
}

// MappableSubscriber is a subscriber that remaps events received to other topics
// and events
type MappableSubscriber interface {
Subscriber
Map(sourceID Topic, destinationID Topic)
}

// Subscribable is a stream that can be subscribed to
type Subscribable interface {
Subscribe(topic Topic, sub Subscriber) bool
Expand All @@ -37,20 +33,16 @@ type Publisher interface {
// EventTransform if a fucntion transforms one kind of event to another
type EventTransform func(Event) Event

// Notifee is a mappable suscriber where you want events to appear
// on this specified topic (used to call SubscribeOn to setup a remapping)
// Notifee is a topic data subscriber plus a set of data you want to add to any topics subscribed to
// (used to call SubscribeWithData to inject data when events for a given topic emit)
type Notifee struct {
Topic Topic
Subscriber MappableSubscriber
Data TopicData
Subscriber *TopicDataSubscriber
}

// SubscribeOn subscribes to the given subscribe on the given topic, but
// maps to a differnt topic specified in a notifee which has a mappable
// subscriber
func SubscribeOn(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.Map(topic, notifee.Topic)
// SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifies
// custom data into the list of data injected into callbacks when events occur on that topic
func SubscribeWithData(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.AddTopicData(topic, notifee.Data)
p.Subscribe(topic, notifee.Subscriber)
}

// IdentityTransform sets up an event transform that makes no changes
func IdentityTransform(ev Event) Event { return ev }
16 changes: 8 additions & 8 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type peerResponseSender struct {
responseBuilders []*responsebuilder.ResponseBuilder
nextBuilderTopic responsebuilder.Topic
queuedMessages chan responsebuilder.Topic
subscriber notifications.MappableSubscriber
allocatorSubscriber notifications.MappableSubscriber
subscriber *notifications.TopicDataSubscriber
allocatorSubscriber *notifications.TopicDataSubscriber
publisher notifications.Publisher
}

Expand Down Expand Up @@ -133,8 +133,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa
publisher: notifications.NewPublisher(),
allocator: allocator,
}
prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform)
prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform)
prs.subscriber = notifications.NewTopicDataSubscriber(&subscriber{prs})
prs.allocatorSubscriber = notifications.NewTopicDataSubscriber(&allocatorSubscriber{prs})
return prs
}

Expand Down Expand Up @@ -418,7 +418,7 @@ func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn fun
responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1]
buildResponseFn(responseBuilder)
for _, notifee := range notifees {
notifications.SubscribeOn(prs.publisher, responseBuilder.Topic(), notifee)
notifications.SubscribeWithData(prs.publisher, responseBuilder.Topic(), notifee)
}
return !responseBuilder.Empty()
}
Expand Down Expand Up @@ -466,8 +466,8 @@ func (prs *peerResponseSender) sendResponseMessages() {
if builder.Empty() {
continue
}
notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{
Topic: builder.BlockSize(),
notifications.SubscribeWithData(prs.publisher, builder.Topic(), notifications.Notifee{
Data: builder.BlockSize(),
Subscriber: prs.allocatorSubscriber,
})
responses, blks, err := builder.Build()
Expand All @@ -476,7 +476,7 @@ func (prs *peerResponseSender) sendResponseMessages() {
}

prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{
Topic: builder.Topic(),
Data: builder.Topic(),
Subscriber: prs.subscriber,
})

Expand Down
10 changes: 5 additions & 5 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData)

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub notifications.MappableSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
peerResponseSender := qe.peerManager.SenderForPeer(p)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: sub}
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
Expand Down Expand Up @@ -199,7 +199,7 @@ func (qe *queryExecutor) executeQuery(
loader ipld.Loader,
traverser ipldutil.Traverser,
signals signals,
sub notifications.MappableSubscriber) (graphsync.ResponseStatusCode, error) {
sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := qe.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
Expand All @@ -210,7 +210,7 @@ func (qe *queryExecutor) executeQuery(
return nil
}
blockData := transaction.SendResponse(link, data)
transaction.AddNotifee(notifications.Notifee{Topic: blockData, Subscriber: sub})
transaction.AddNotifee(notifications.Notifee{Data: blockData, Subscriber: sub})
if blockData.BlockSize() > 0 {
result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
for _, extension := range result.Extensions {
Expand Down Expand Up @@ -253,7 +253,7 @@ func (qe *queryExecutor) executeQuery(
} else {
code = peerResponseSender.FinishRequest()
}
peerResponseSender.AddNotifee(notifications.Notifee{Topic: code, Subscriber: sub})
peerResponseSender.AddNotifee(notifications.Notifee{Data: code, Subscriber: sub})
return nil
})
return code, err
Expand Down
Loading