Skip to content

Commit

Permalink
fix(test): test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Nov 28, 2022
1 parent 540bbff commit 3c73341
Showing 1 changed file with 93 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@ import (
)

func TestMessageSeenCacheTTL(t *testing.T) {
// Test with a few different seen cache TTL values
if err := RunMessageSeenCacheTTLTest(t, "1s"); err != nil {
t.Fatal(err)
}
if err := RunMessageSeenCacheTTLTest(t, "3s"); err != nil {
t.Fatal(err)
}
if err := RunMessageSeenCacheTTLTest(t, "5s"); err != nil {
if err := RunMessageSeenCacheTTLTest(t, "10s"); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -78,48 +71,52 @@ func mockNode(ctx context.Context, mn mocknet.Mocknet, pubsubEnabled bool, seenM
}

func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error {
//ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var bootstrapNode, consumerNode, producerNode *core.IpfsNode
var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID
var err error

sendDupMsg := false

// create network
mn := mocknet.New()
bootstrapNode, err = mockNode(ctx, mn, false, "")
bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration
if err != nil {
t.Fatal(err)
}
bootstrapPeerID = bootstrapNode.PeerHost.ID()
defer bootstrapNode.Close()

consumerNode, err = mockNode(ctx, mn, true, seenMessagesCacheTTL)
consumerNode, err = mockNode(ctx, mn, true, seenMessagesCacheTTL) // use passed seen cache TTL
if err != nil {
t.Fatal(err)
}
consumerPeerID = consumerNode.PeerHost.ID()
defer consumerNode.Close()

ttl, err := time.ParseDuration(seenMessagesCacheTTL)
if err != nil {
t.Fatal(err)
}

// Set up the pubsub message ID generation override for the producer
core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) {
ttl, err := time.ParseDuration(seenMessagesCacheTTL)
if err != nil {
return nil, err
}
var pubsubOptions []pubsub.Option
pubsubOptions = append(
pubsubOptions,
pubsub.WithSeenMessagesTTL(ttl),
pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string {
now := time.Now().Format(time.StampMilli)
msg := string(pmsg.Data)
var msgID string
from, _ := peer.IDFromBytes(pmsg.From)
if (from == producerPeerID) && sendDupMsg {
t.Logf("sending [%s] with duplicate message ID at [%s]", string(pmsg.Data), time.Now().Format(time.StampMilli))
return "DupMsg"
msgID = "DupMsg"
t.Logf("sending [%s] with duplicate message ID at [%s]", msg, now)
} else {
msgID = pubsub.DefaultMsgIdFn(pmsg)
t.Logf("sending [%s] with unique message ID at [%s]", msg, now)
}
return pubsub.DefaultMsgIdFn(pmsg)
return msgID
}),
)
return append(
Expand All @@ -129,7 +126,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
), nil
})

producerNode, err = mockNode(ctx, mn, false, "")
producerNode, err = mockNode(ctx, mn, false, "") // PubSub configuration comes from overrides above
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -161,50 +158,87 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
t.Fatal(err)
}

consumerSubscription, err := consumerAPI.PubSub().Subscribe(ctx, "bleep")
// Set up the consumer subscription
const TopicName = "topic"
consumerSubscription, err := consumerAPI.PubSub().Subscribe(ctx, TopicName)
if err != nil {
t.Fatal(err)
}
// Utility functions defined inline to include context in closure
now := func() string {
return time.Now().Format(time.StampMilli)
}
ctr := 0
msgGen := func() string {
ctr++
return fmt.Sprintf("msg_%d", ctr)
}
produceMessage := func() string {
msgTxt := msgGen()
err = producerAPI.PubSub().Publish(ctx, TopicName, []byte(msgTxt))
if err != nil {
t.Fatal(err)
}
return msgTxt
}
consumeMessage := func(msgTxt string, shouldFind bool) {
// Set up a separate timed context for receiving messages
rxCtx, rxCancel := context.WithTimeout(context.Background(), time.Second)
defer rxCancel()
msg, err := consumerSubscription.Next(rxCtx)
if shouldFind {
if err != nil {
t.Logf("did not receive [%s] by [%s]", msgTxt, now())
t.Fatal(err)
}
t.Logf("received [%s] at [%s]", string(msg.Data()), now())
if !bytes.Equal(msg.Data(), []byte(msgTxt)) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt)
}
} else {
if err == nil {
t.Logf("received [%s] at [%s]", string(msg.Data()), now())
t.Fail()
}
t.Logf("did not receive [%s] by [%s]", msgTxt, now())
}
}

// Send message 1 with the message ID we're going to duplicate later
sendDupMsg = true
err = producerAPI.PubSub().Publish(ctx, "bleep", []byte("bloop"))
if err != nil {
t.Fatal(err)
}
msgTxt := produceMessage()
consumeMessage(msgTxt, true) // should find message

// Send message 2 with the same message ID as before
sendDupMsg = true
msgTxt = produceMessage()
consumeMessage(msgTxt, false) // should NOT find message

// Wait for seen cache TTL time to let seen cache entries time out
time.Sleep(ttl)

// Send message 3 with a new message ID
//
// This extra step is necessary for testing the cache TTL because the PubSub code only garbage collects when a
// message ID was not already present in the cache. This means that message 2's cache entry, even though it has
// technically timed out, will still cause the message to be considered duplicate. When a message with a different
// ID passes through, it will be added to the cache and garbage collection will clean up message 2's entry. This is
// another bug in the pubsub/cache implementation that will be fixed once the code is refactored for this issue:
// https://github.com/libp2p/go-libp2p-pubsub/issues/502
sendDupMsg = false
err = producerAPI.PubSub().Publish(ctx, "bleep", []byte("blargh"))
if err != nil {
t.Fatal(err)
}
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message

// Send message 4 with the same message ID as before
sendDupMsg = true
err = producerAPI.PubSub().Publish(ctx, "bleep", []byte("blech"))
if err != nil {
t.Fatal(err)
}
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message

msg, err := consumerSubscription.Next(ctx)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(msg.Data(), []byte("bloop")) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), "bloop")
}
t.Logf("received [%s] at [%s]", string(msg.Data()), time.Now().Format(time.StampMilli))
msg, err = consumerSubscription.Next(ctx)
if err != nil {
t.Fatal(err)
}
t.Logf("received [%s] at [%s]", string(msg.Data()), time.Now().Format(time.StampMilli))
if !bytes.Equal(msg.Data(), []byte("blargh")) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), "blargh")
}
msg, err = consumerSubscription.Next(ctx)
if err != nil {
t.Fatal(err)
}
t.Logf("received [%s] at [%s]", string(msg.Data()), time.Now().Format(time.StampMilli))
if !bytes.Equal(msg.Data(), []byte("blech")) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), "blech")
}
// Send message 5 with a new message ID
//
// This step is not strictly necessary, but has been added for good measure.
sendDupMsg = false
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message
return nil
}

0 comments on commit 3c73341

Please sign in to comment.