Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publishing option for signing a message with a custom private key #486

Merged
merged 7 commits into from
May 26, 2022
Merged
64 changes: 50 additions & 14 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/crypto"
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,6 +16,12 @@ import (
// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

// ErrNilSignKey is returned if a nil private key was provided
var ErrNilSignKey = errors.New("nil sign key")

// ErrEmptyPeerID is returned if an empty peer ID was provided
var ErrEmptyPeerID = errors.New("empty peer ID")

// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
Expand Down Expand Up @@ -202,8 +209,12 @@ func (t *Topic) Relay() (RelayCancelFunc, error) {
// RouterReady is a function that decides if a router is ready to publish
type RouterReady func(rt PubSubRouter, topic string) (bool, error)

// ProvideKey is a function that provides a private key and its associated peer ID when publishing a new message
type ProvideKey func() (crypto.PrivKey, peer.ID)

type PublishOptions struct {
ready RouterReady
ready RouterReady
customKey ProvideKey
}

type PubOpt func(pub *PublishOptions) error
Expand All @@ -216,27 +227,40 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
return ErrTopicClosed
}

pid := t.p.signID
key := t.p.signKey

pub := &PublishOptions{}
for _, opt := range opts {
err := opt(pub)
if err != nil {
return err
}
}

if pub.customKey != nil {
key, pid = pub.customKey()
if key == nil {
return ErrNilSignKey
}
if len(pid) == 0 {
return ErrEmptyPeerID
}
}

m := &pb.Message{
Data: data,
Topic: &t.topic,
From: nil,
Seqno: nil,
}
if t.p.signID != "" {
m.From = []byte(t.p.signID)
if pid != "" {
m.From = []byte(pid)
m.Seqno = t.p.nextSeqno()
}
if t.p.signKey != nil {
m.From = []byte(t.p.signID)
err := signMessage(t.p.signID, t.p.signKey, m)
if err != nil {
return err
}
}

pub := &PublishOptions{}
for _, opt := range opts {
err := opt(pub)
if key != nil {
m.From = []byte(pid)
err := signMessage(pid, key, m)
if err != nil {
return err
}
Expand Down Expand Up @@ -295,6 +319,18 @@ func WithReadiness(ready RouterReady) PubOpt {
}
}

// WithSecretKeyAndPeerId returns a publishing option for providing a custom private key and its corresponding peer ID
// This option is useful when we want to send messages from "virtual", never-connectable peers in the network
func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
return func(pub *PublishOptions) error {
pub.customKey = func() (crypto.PrivKey, peer.ID) {
return key, pid
}

return nil
}
}

// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
Expand Down
103 changes: 101 additions & 2 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
tnet "github.com/libp2p/go-libp2p-testing/net"
)

func getTopics(psubs []*PubSub, topicID string, opts ...TopicOpt) []*Topic {
Expand Down Expand Up @@ -381,7 +382,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) {

// Test removing peers and verifying that they cause events
subs[1].Cancel()
hosts[2].Close()
_ = hosts[2].Close()
psubs[0].BlacklistPeer(hosts[3].ID())

leavingPeers := make(map[peer.ID]struct{})
Expand Down Expand Up @@ -878,7 +879,7 @@ func TestWithTopicMsgIdFunction(t *testing.T) {
}))
connectAll(t, hosts)

topicsA := getTopics(pubsubs, topicA) // uses global msgIdFn
topicsA := getTopics(pubsubs, topicA) // uses global msgIdFn
topicsB := getTopics(pubsubs, topicB, WithTopicMessageIdFn(func(pmsg *pb.Message) string { // uses custom
hash := sha1.Sum(pmsg.Data)
return string(hash[:])
Expand Down Expand Up @@ -920,3 +921,101 @@ func TestWithTopicMsgIdFunction(t *testing.T) {
t.Fatal("msg ids are equal")
}
}

func TestTopicPublishWithKeyInvalidParameters(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const topic = "foobar"
const numHosts = 5

virtualPeer := tnet.RandPeerNetParamsOrFatal(t)
hosts := getNetHosts(t, ctx, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)

t.Run("nil sign private key should error", func(t *testing.T) {
withVirtualKey := WithSecretKeyAndPeerId(nil, virtualPeer.ID)
err := topics[0].Publish(ctx, []byte("buff"), withVirtualKey)
if err != ErrNilSignKey {
t.Fatal("error should have been of type errNilSignKey")
}
})
t.Run("empty peer ID should error", func(t *testing.T) {
withVirtualKey := WithSecretKeyAndPeerId(virtualPeer.PrivKey, "")
err := topics[0].Publish(ctx, []byte("buff"), withVirtualKey)
if err != ErrEmptyPeerID {
t.Fatal("error should have been of type errEmptyPeerID")
}
})
}

func TestTopicRelayPublishWithKey(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const topic = "foobar"
const numHosts = 5

virtualPeer := tnet.RandPeerNetParamsOrFatal(t)
hosts := getNetHosts(t, ctx, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)

// [0.Rel] - [1.Rel] - [2.Sub]
// |
// [3.Rel] - [4.Sub]

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
connect(t, hosts[1], hosts[3])
connect(t, hosts[3], hosts[4])

time.Sleep(time.Millisecond * 100)

var subs []*Subscription

for i, topicValue := range topics {
if i == 2 || i == 4 {
sub, err := topicValue.Subscribe()
if err != nil {
t.Fatal(err)
}

subs = append(subs, sub)
} else {
_, err := topicValue.Relay()
if err != nil {
t.Fatal(err)
}
}
}

time.Sleep(time.Millisecond * 100)

for i := 0; i < 100; i++ {
msg := []byte("message")

owner := rand.Intn(len(topics))

withVirtualKey := WithSecretKeyAndPeerId(virtualPeer.PrivKey, virtualPeer.ID)
err := topics[owner].Publish(ctx, msg, withVirtualKey)
if err != nil {
t.Fatal(err)
}

for _, sub := range subs {
received, errSub := sub.Next(ctx)
if errSub != nil {
t.Fatal(errSub)
}

if !bytes.Equal(msg, received.Data) {
t.Fatal("received message is other than expected")
}
if string(received.From) != string(virtualPeer.ID) {
t.Fatal("received message is not from the virtual peer")
}
}
}
}