Skip to content

Commit

Permalink
feat: WithLocalPublication option to enable local only publishing on …
Browse files Browse the repository at this point in the history
…a topic
  • Loading branch information
Wondertan committed Mar 24, 2022
1 parent fa4161c commit 39634a9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
7 changes: 5 additions & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ type Message struct {
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
Local bool
}

func (m *Message) GetFrom() peer.ID {
Expand Down Expand Up @@ -1044,7 +1045,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
continue
}

p.pushMsg(&Message{pmsg, "", rpc.from, nil})
p.pushMsg(&Message{pmsg, "", rpc.from, nil, false})
}
}

Expand Down Expand Up @@ -1143,7 +1144,9 @@ func (p *PubSub) checkSigningPolicy(msg *Message) error {
func (p *PubSub) publishMessage(msg *Message) {
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
p.rt.Publish(msg)
if !msg.Local {
p.rt.Publish(msg)
}
}

type addTopicReq struct {
Expand Down
12 changes: 11 additions & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ type RouterReady func(rt PubSubRouter, topic string) (bool, error)

type PublishOptions struct {
ready RouterReady
local bool
}

type PubOpt func(pub *PublishOptions) error
Expand Down Expand Up @@ -283,7 +284,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
}
}

return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil, pub.local})
}

// WithReadiness returns a publishing option for only publishing when the router is ready.
Expand All @@ -295,6 +296,15 @@ func WithReadiness(ready RouterReady) PubOpt {
}
}

// WithLocalPublication option tells Publish to *only* notify local subscribers about a message.
// This option prevents messages publication to peers.
func WithLocalPublication(local bool) PubOpt {
return func(pub *PublishOptions) error {
pub.local = local
return nil
}
}

// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
Expand Down
45 changes: 45 additions & 0 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,3 +920,48 @@ func TestWithTopicMsgIdFunction(t *testing.T) {
t.Fatal("msg ids are equal")
}
}

func TestWithLocalPublication(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const topic = "test"

hosts := getNetHosts(t, ctx, 2)
pubsubs := getPubsubs(ctx, hosts)
topics := getTopics(pubsubs, topic)
connectAll(t, hosts)

payload := []byte("pubsub smashes")

local, err := topics[0].Subscribe()
if err != nil {
t.Fatal(err)
}

remote, err := topics[1].Subscribe()
if err != nil {
t.Fatal(err)
}

err = topics[0].Publish(ctx, payload, WithLocalPublication(true))
if err != nil {
t.Fatal(err)
}

remoteCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
defer cancel()

msg, err := remote.Next(remoteCtx)
if msg != nil || err == nil {
t.Fatal("unexpected msg")
}

msg, err = local.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !msg.Local || !bytes.Equal(msg.Data, payload) {
t.Fatal("wrong message")
}
}

0 comments on commit 39634a9

Please sign in to comment.