Skip to content

Commit

Permalink
feat: WithLocalPublication option to enable local only publishing on …
Browse files Browse the repository at this point in the history
…a topic (#481)

* feat: WithLocalPublication option to enable local only publishing on a topic

* docs: improve comment on WithLocalPublication option
  • Loading branch information
Wondertan authored Jul 1, 2022
1 parent ca70228 commit 96efa27
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 3 deletions.
7 changes: 5 additions & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type Message struct {
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
Local bool
}

func (m *Message) GetFrom() peer.ID {
Expand Down Expand Up @@ -1066,7 +1067,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
continue
}

p.pushMsg(&Message{pmsg, "", rpc.from, nil})
p.pushMsg(&Message{pmsg, "", rpc.from, nil, false})
}
}

Expand Down Expand Up @@ -1165,7 +1166,9 @@ func (p *PubSub) checkSigningPolicy(msg *Message) error {
func (p *PubSub) publishMessage(msg *Message) {
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
p.rt.Publish(msg)
if !msg.Local {
p.rt.Publish(msg)
}
}

type addTopicReq struct {
Expand Down
15 changes: 14 additions & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ type ProvideKey func() (crypto.PrivKey, peer.ID)
type PublishOptions struct {
ready RouterReady
customKey ProvideKey
local bool
}

type PubOpt func(pub *PublishOptions) error
Expand Down Expand Up @@ -307,7 +308,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
}
}

return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil, pub.local})
}

// WithReadiness returns a publishing option for only publishing when the router is ready.
Expand All @@ -319,6 +320,18 @@ func WithReadiness(ready RouterReady) PubOpt {
}
}

// WithLocalPublication returns a publishing option to notify in-process subscribers only.
// It prevents message publication to mesh peers.
// Useful in edge cases where the msg needs to be only delivered to the in-process subscribers,
// e.g. not to spam the network with outdated msgs.
// Should not be used specifically for in-process pubsubing.
func WithLocalPublication(local bool) PubOpt {
return func(pub *PublishOptions) error {
pub.local = local
return nil
}
}

// WithSecretKeyAndPeerId returns a publishing option for providing a custom private key and its corresponding peer ID
// This option is useful when we want to send messages from "virtual", never-connectable peers in the network
func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
Expand Down
45 changes: 45 additions & 0 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,3 +1019,48 @@ func TestTopicRelayPublishWithKey(t *testing.T) {
}
}
}

func TestWithLocalPublication(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const topic = "test"

hosts := getNetHosts(t, ctx, 2)
pubsubs := getPubsubs(ctx, hosts)
topics := getTopics(pubsubs, topic)
connectAll(t, hosts)

payload := []byte("pubsub smashes")

local, err := topics[0].Subscribe()
if err != nil {
t.Fatal(err)
}

remote, err := topics[1].Subscribe()
if err != nil {
t.Fatal(err)
}

err = topics[0].Publish(ctx, payload, WithLocalPublication(true))
if err != nil {
t.Fatal(err)
}

remoteCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
defer cancel()

msg, err := remote.Next(remoteCtx)
if msg != nil || err == nil {
t.Fatal("unexpected msg")
}

msg, err = local.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !msg.Local || !bytes.Equal(msg.Data, payload) {
t.Fatal("wrong message")
}
}

0 comments on commit 96efa27

Please sign in to comment.