From 3b4622be6fa316e9ffe990e202c66acfecfb20c7 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 9 Jan 2023 21:10:43 -0500 Subject: [PATCH 01/15] feat: expire messages from the cache based on last seen time --- go.mod | 3 + go.sum | 2 + .../integration/pubsub_msg_seen_cache_test.go | 116 ++++++++++++------ 3 files changed, 82 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index cb399de9962..090e09fbc1f 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/ipfs/kubo +replace github.com/libp2p/go-libp2p-pubsub => ../../libp2p/go-libp2p-pubsub + require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc contrib.go.opencensus.io/exporter/prometheus v0.4.0 @@ -135,6 +137,7 @@ require ( github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/felixge/httpsnoop v1.0.2 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect diff --git a/go.sum b/go.sum index 65831c18bd3..ee0a9b75ff7 100644 --- a/go.sum +++ b/go.sum @@ -209,6 +209,8 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/ github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA= github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 394cda5b120..987d413785f 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -76,7 +76,6 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error var bootstrapNode, consumerNode, producerNode *core.IpfsNode var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID - sendDupMsg := false mn := mocknet.New() bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration @@ -98,6 +97,12 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error t.Fatal(err) } + // Used for logging the timeline + startTime := time.Time{} + + // Used for overriding the message ID + sendMsgId := "" + // Set up the pubsub message ID generation override for the producer core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) { var pubsubOptions []pubsub.Option @@ -105,18 +110,21 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error pubsubOptions, pubsub.WithSeenMessagesTTL(ttl), pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string { - now := time.Now().Format(time.StampMilli) + now := time.Now() + if startTime.Second() == 0 { + startTime = now + } + timeElapsed := now.Sub(startTime).Seconds() msg := string(pmsg.Data) - var msgID string from, _ := peer.IDFromBytes(pmsg.From) - if (from == producerPeerID) && sendDupMsg { - msgID = "DupMsg" - t.Logf("sending [%s] with duplicate message ID at [%s]", msg, now) + var msgId string + if from == producerPeerID { + msgId = sendMsgId + t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgId, timeElapsed) } else { - msgID = pubsub.DefaultMsgIdFn(pmsg) - t.Logf("sending [%s] with unique message ID at [%s]", msg, now) + msgId = pubsub.DefaultMsgIdFn(pmsg) } - return msgID + return msgId }), ) return append( @@ -165,8 +173,8 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error t.Fatal(err) } // Utility functions defined inline to include context in closure - now := func() string { - return time.Now().Format(time.StampMilli) + now := func() float64 { + return time.Now().Sub(startTime).Seconds() } ctr := 0 msgGen := func() string { @@ -188,57 +196,87 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error msg, err := consumerSubscription.Next(rxCtx) if shouldFind { if err != nil { - t.Logf("did not receive [%s] by [%s]", msgTxt, now()) + t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now()) t.Fatal(err) } - t.Logf("received [%s] at [%s]", string(msg.Data()), now()) + t.Logf("received [%s] at T%fs", string(msg.Data()), now()) if !bytes.Equal(msg.Data(), []byte(msgTxt)) { t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt) } } else { if err == nil { - t.Logf("received [%s] at [%s]", string(msg.Data()), now()) + t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now()) t.Fail() } - t.Logf("did not receive [%s] by [%s]", msgTxt, now()) + t.Logf("did not receive [%s] at T%fs", msgTxt, now()) } } - // Send message 1 with the message ID we're going to duplicate later - sendDupMsg = true + const MsgId_1 = "MsgId_1" + const MsgId_2 = "MsgId_2" + const MsgId_3 = "MsgId_3" + + // Send message 1 with the message ID we're going to duplicate + sentMsg1 := time.Now() + sendMsgId = MsgId_1 msgTxt := produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) - // Send message 2 with the same message ID as before - sendDupMsg = true + // Send message 2 with a duplicate message ID + sendMsgId = MsgId_1 msgTxt = produceMessage() - consumeMessage(msgTxt, false) // should NOT find message, because it got deduplicated (sent twice within the SeenMessagesTTL window) - - // Wait for seen cache TTL time to let seen cache entries time out - time.Sleep(ttl) + // Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window). + consumeMessage(msgTxt, false) // Send message 3 with a new message ID - // - // This extra step is necessary for testing the cache TTL because the PubSub code only garbage collects when a - // message ID was not already present in the cache. This means that message 2's cache entry, even though it has - // technically timed out, will still cause the message to be considered duplicate. When a message with a different - // ID passes through, it will be added to the cache and garbage collection will clean up message 2's entry. This is - // another bug in the pubsub/cache implementation that will be fixed once the code is refactored for this issue: - // https://github.com/libp2p/go-libp2p-pubsub/issues/502 - sendDupMsg = false + sendMsgId = MsgId_2 + msgTxt = produceMessage() + // Should find the message because it's new + consumeMessage(msgTxt, true) + + // Wait till just before the SeenMessagesTTL window has passed since message 1 was sent + time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond))) + + // Send message 4 with a duplicate message ID + sendMsgId = MsgId_1 + msgTxt = produceMessage() + // Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This + // time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since + // the default time cache now implements a sliding window algorithm. + consumeMessage(msgTxt, false) + + // Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding + // a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window + // starting at message 1 has expired. + sentMsg5 := time.Now() + sendMsgId = MsgId_1 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window + // started). This time again, the expiration should get pushed out for another SeenMessagesTTL window. + consumeMessage(msgTxt, false) + + // Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window + sendMsgId = MsgId_2 + msgTxt = produceMessage() + // Should find the message since last read > SeenMessagesTTL, so it looks like a new message. + consumeMessage(msgTxt, true) + + // Sleep for a full SeenMessagesTTL window to let cache entries time out + time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond))) - // Send message 4 with the same message ID as before - sendDupMsg = true + // Send message 7 with a duplicate message ID + sendMsgId = MsgId_1 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message again (time since the last read > SeenMessagesTTL, so it looks like a new message). + // Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message. + consumeMessage(msgTxt, true) - // Send message 5 with a new message ID + // Send message 8 with a brand new message ID // // This step is not strictly necessary, but has been added for good measure. - sendDupMsg = false + sendMsgId = MsgId_3 msgTxt = produceMessage() - consumeMessage(msgTxt, true) // should find message + // Should find the message because it's new + consumeMessage(msgTxt, true) return nil } From 5cd2c3ecd283698388d474f4a42f00427f764b90 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Sat, 14 Jan 2023 00:25:21 +0100 Subject: [PATCH 02/15] chore: go-libp2p-pubsub PR 513 https://github.com/libp2p/go-libp2p-pubsub/pull/513 --- go.mod | 3 +-- go.sum | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 8f788dc7e84..0701cdab7a9 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ipfs/kubo -replace github.com/libp2p/go-libp2p-pubsub => ../../libp2p/go-libp2p-pubsub +replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc @@ -227,7 +227,6 @@ require ( github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect diff --git a/go.sum b/go.sum index c710f889633..3e7c83925ab 100644 --- a/go.sum +++ b/go.sum @@ -819,8 +819,6 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= -github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1275,6 +1273,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= +github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f h1:Z8HthHnvHtDafaSRNU6n+UodAqOuptMcDBuLprrv1Qg= +github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= @@ -1361,8 +1361,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= From 75cf88efd9c0c5de0b65472aaa4e69d5f8b7c296 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Fri, 13 Jan 2023 18:36:18 -0500 Subject: [PATCH 03/15] chore: lint fixes --- .../integration/pubsub_msg_seen_cache_test.go | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 987d413785f..deb4713185b 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -101,7 +101,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error startTime := time.Time{} // Used for overriding the message ID - sendMsgId := "" + sendMsgID := "" // Set up the pubsub message ID generation override for the producer core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) { @@ -117,14 +117,14 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error timeElapsed := now.Sub(startTime).Seconds() msg := string(pmsg.Data) from, _ := peer.IDFromBytes(pmsg.From) - var msgId string + var msgID string if from == producerPeerID { - msgId = sendMsgId - t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgId, timeElapsed) + msgID = sendMsgID + t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgID, timeElapsed) } else { - msgId = pubsub.DefaultMsgIdFn(pmsg) + msgID = pubsub.DefaultMsgIdFn(pmsg) } - return msgId + return msgID }), ) return append( @@ -174,7 +174,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error } // Utility functions defined inline to include context in closure now := func() float64 { - return time.Now().Sub(startTime).Seconds() + return time.Since(startTime).Seconds() } ctr := 0 msgGen := func() string { @@ -212,25 +212,25 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error } } - const MsgId_1 = "MsgId_1" - const MsgId_2 = "MsgId_2" - const MsgId_3 = "MsgId_3" + const MsgID1 = "MsgID1" + const MsgID2 = "MsgID2" + const MsgID3 = "MsgID3" // Send message 1 with the message ID we're going to duplicate sentMsg1 := time.Now() - sendMsgId = MsgId_1 + sendMsgID = MsgID1 msgTxt := produceMessage() // Should find the message because it's new consumeMessage(msgTxt, true) // Send message 2 with a duplicate message ID - sendMsgId = MsgId_1 + sendMsgID = MsgID1 msgTxt = produceMessage() // Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window). consumeMessage(msgTxt, false) // Send message 3 with a new message ID - sendMsgId = MsgId_2 + sendMsgID = MsgID2 msgTxt = produceMessage() // Should find the message because it's new consumeMessage(msgTxt, true) @@ -239,7 +239,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond))) // Send message 4 with a duplicate message ID - sendMsgId = MsgId_1 + sendMsgID = MsgID1 msgTxt = produceMessage() // Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This // time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since @@ -250,14 +250,14 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error // a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window // starting at message 1 has expired. sentMsg5 := time.Now() - sendMsgId = MsgId_1 + sendMsgID = MsgID1 msgTxt = produceMessage() // Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window // started). This time again, the expiration should get pushed out for another SeenMessagesTTL window. consumeMessage(msgTxt, false) // Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window - sendMsgId = MsgId_2 + sendMsgID = MsgID2 msgTxt = produceMessage() // Should find the message since last read > SeenMessagesTTL, so it looks like a new message. consumeMessage(msgTxt, true) @@ -266,7 +266,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond))) // Send message 7 with a duplicate message ID - sendMsgId = MsgId_1 + sendMsgID = MsgID1 msgTxt = produceMessage() // Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message. consumeMessage(msgTxt, true) @@ -274,7 +274,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error // Send message 8 with a brand new message ID // // This step is not strictly necessary, but has been added for good measure. - sendMsgId = MsgId_3 + sendMsgID = MsgID3 msgTxt = produceMessage() // Should find the message because it's new consumeMessage(msgTxt, true) From ebe77ef682b8973f32af6e1f50b5338821fc731c Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 16 Jan 2023 16:29:17 -0500 Subject: [PATCH 04/15] chore: go-libp2p-pubsub PR 513 libp2p/go-libp2p-pubsub#513 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0701cdab7a9..25e999fbf4c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ipfs/kubo -replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f +replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230116153407-f7c6da67bdd1 require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc diff --git a/go.sum b/go.sum index 3e7c83925ab..472824245aa 100644 --- a/go.sum +++ b/go.sum @@ -1273,8 +1273,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= -github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f h1:Z8HthHnvHtDafaSRNU6n+UodAqOuptMcDBuLprrv1Qg= -github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= +github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230116153407-f7c6da67bdd1 h1:9E3RMdCMySV9BqvMP11ksYXk+VJ2uhhxkZ1L9TqbPOs= +github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230116153407-f7c6da67bdd1/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= From 1fe17346a5ff3b9f15f7577a627b8f78482f4cca Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 16:36:14 -0500 Subject: [PATCH 05/15] chore: go-libp2p-pubsub PR 513 libp2p/go-libp2p-pubsub#513 use last seen time cache implementation --- core/node/groups.go | 3 +++ go.mod | 2 +- go.sum | 4 ++-- test/integration/pubsub_msg_seen_cache_test.go | 2 ++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/node/groups.go b/core/node/groups.go index aa650ddf5d0..692090c6dd9 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -11,6 +11,7 @@ import ( "github.com/ipfs/go-log" "github.com/ipfs/kubo/config" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" "github.com/ipfs/kubo/core/node/libp2p" @@ -64,6 +65,8 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { pubsubOptions, pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning), pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), + // Use the "last seen" cache by default + pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen), ) switch cfg.Pubsub.Router { diff --git a/go.mod b/go.mod index 25e999fbf4c..debf09fd1e8 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ipfs/kubo -replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230116153407-f7c6da67bdd1 +replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230123204127-c3bfcfac95e3 require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc diff --git a/go.sum b/go.sum index 472824245aa..1b4d34853a1 100644 --- a/go.sum +++ b/go.sum @@ -1273,8 +1273,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= -github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230116153407-f7c6da67bdd1 h1:9E3RMdCMySV9BqvMP11ksYXk+VJ2uhhxkZ1L9TqbPOs= -github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230116153407-f7c6da67bdd1/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= +github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230123204127-c3bfcfac95e3 h1:GsZBeIn4DkCy5N2FG155YCTSsCc8/n5FVyiDl5A4COM= +github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230123204127-c3bfcfac95e3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index deb4713185b..7495080893d 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" mock "github.com/ipfs/kubo/core/mock" @@ -126,6 +127,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error } return msgID }), + pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen), ) return append( info.FXOptions, From 5e337457bde4043091a34eb69ece718d95c636d0 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 18:11:25 -0500 Subject: [PATCH 06/15] chore: bump pubsub consumer rx timeout --- test/integration/pubsub_msg_seen_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 7495080893d..aca98c91b42 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -193,7 +193,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error } consumeMessage := func(msgTxt string, shouldFind bool) { // Set up a separate timed context for receiving messages - rxCtx, rxCancel := context.WithTimeout(context.Background(), time.Second) + rxCtx, rxCancel := context.WithTimeout(context.Background(), 5*time.Second) defer rxCancel() msg, err := consumerSubscription.Next(rxCtx) if shouldFind { From f7c8183972092797ae858cdb47513162a41ae740 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 19:29:30 -0500 Subject: [PATCH 07/15] =?UTF-8?q?chore:=20use=20official=20pubsub=20releas?= =?UTF-8?q?e!=20=F0=9F=A5=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index f881288c140..5120159fdfe 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,5 @@ module github.com/ipfs/kubo -replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230123204127-c3bfcfac95e3 - require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc contrib.go.opencensus.io/exporter/prometheus v0.4.0 @@ -77,7 +75,7 @@ require ( github.com/libp2p/go-libp2p-http v0.4.0 github.com/libp2p/go-libp2p-kad-dht v0.20.0 github.com/libp2p/go-libp2p-kbucket v0.5.0 - github.com/libp2p/go-libp2p-pubsub v0.8.2 + github.com/libp2p/go-libp2p-pubsub v0.8.3 github.com/libp2p/go-libp2p-pubsub-router v0.6.0 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.6.0 diff --git a/go.sum b/go.sum index 9d2b3999472..e044b9a0046 100644 --- a/go.sum +++ b/go.sum @@ -809,6 +809,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= +github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww= +github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1258,8 +1260,6 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= -github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230123204127-c3bfcfac95e3 h1:GsZBeIn4DkCy5N2FG155YCTSsCc8/n5FVyiDl5A4COM= -github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230123204127-c3bfcfac95e3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= From 9caf07301ae7d92f6f90030951d82410b02fa844 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 19:31:55 -0500 Subject: [PATCH 08/15] chore: go mod tidy in docs --- docs/examples/kubo-as-a-library/go.mod | 4 ++-- docs/examples/kubo-as-a-library/go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index edc6848315f..0a4a436d9fe 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -38,6 +38,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -122,7 +123,7 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-kad-dht v0.20.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect - github.com/libp2p/go-libp2p-pubsub v0.8.2 // indirect + github.com/libp2p/go-libp2p-pubsub v0.8.3 // indirect github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.6.0 // indirect @@ -180,7 +181,6 @@ require ( github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index fa6c022f334..eb52ab5e864 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -198,6 +198,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -773,8 +775,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I= -github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw= +github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww= +github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw= github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s= github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= @@ -1284,8 +1286,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= From 11e6a177282c1d707c8153194c41d04e03779b74 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 19:45:34 -0500 Subject: [PATCH 09/15] chore: editorial --- test/integration/pubsub_msg_seen_cache_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index aca98c91b42..871d7cc5e67 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -248,9 +248,9 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error // the default time cache now implements a sliding window algorithm. consumeMessage(msgTxt, false) - // Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding - // a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window - // starting at message 1 has expired. + // Send message 5 with a duplicate message ID. This will be 5 seconds after the last attempt above since NOT finding + // a message takes 5 seconds to determine. That would put this attempt at ~5 seconds after the SeenMessagesTTL + // window starting at message 1 has expired. sentMsg5 := time.Now() sendMsgID = MsgID1 msgTxt = produceMessage() From 5532bab2b8b924c75319b6fbb429a20d3baa2259 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 20:02:23 -0500 Subject: [PATCH 10/15] chore: switch pubsub consumer rx timeout back to 1s --- test/integration/pubsub_msg_seen_cache_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 871d7cc5e67..7495080893d 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -193,7 +193,7 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error } consumeMessage := func(msgTxt string, shouldFind bool) { // Set up a separate timed context for receiving messages - rxCtx, rxCancel := context.WithTimeout(context.Background(), 5*time.Second) + rxCtx, rxCancel := context.WithTimeout(context.Background(), time.Second) defer rxCancel() msg, err := consumerSubscription.Next(rxCtx) if shouldFind { @@ -248,9 +248,9 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error // the default time cache now implements a sliding window algorithm. consumeMessage(msgTxt, false) - // Send message 5 with a duplicate message ID. This will be 5 seconds after the last attempt above since NOT finding - // a message takes 5 seconds to determine. That would put this attempt at ~5 seconds after the SeenMessagesTTL - // window starting at message 1 has expired. + // Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding + // a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window + // starting at message 1 has expired. sentMsg5 := time.Now() sendMsgID = MsgID1 msgTxt = produceMessage() From 543683a488ac1bf34c78fa90de74286db1c91478 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 24 Jan 2023 16:20:26 -0500 Subject: [PATCH 11/15] feat: make SeenMessagesStrategy configurable --- config/pubsub.go | 4 ++++ core/node/groups.go | 3 +-- docs/config.md | 23 +++++++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/config/pubsub.go b/config/pubsub.go index ba80843005a..d284e3a4956 100644 --- a/config/pubsub.go +++ b/config/pubsub.go @@ -15,4 +15,8 @@ type PubsubConfig struct { // SeenMessagesTTL configures the duration after which a previously seen // message ID can be forgotten about. SeenMessagesTTL *OptionalDuration `json:",omitempty"` + + // SeenMessagesStrategy configures the cache implementation for previously + // seen messages. + SeenMessagesStrategy *OptionalInteger `json:",omitempty"` } diff --git a/core/node/groups.go b/core/node/groups.go index 692090c6dd9..1042e416bc4 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -65,8 +65,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { pubsubOptions, pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning), pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), - // Use the "last seen" cache by default - pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen), + pubsub.WithSeenMessagesStrategy(timecache.Strategy(cfg.Pubsub.SeenMessagesStrategy.WithDefault(int64(timecache.Strategy_LastSeen)))), ) switch cfg.Pubsub.Router { diff --git a/docs/config.md b/docs/config.md index da919d84450..190623731eb 100644 --- a/docs/config.md +++ b/docs/config.md @@ -100,6 +100,7 @@ config file at runtime. - [`Pubsub.Router`](#pubsubrouter) - [`Pubsub.DisableSigning`](#pubsubdisablesigning) - [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl) + - [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy) - [`Peering`](#peering) - [`Peering.Peers`](#peeringpeers) - [`Reprovider`](#reprovider) @@ -1223,6 +1224,28 @@ Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp Type: `optionalDuration` +### `Pubsub.SeenMessagesStrategy` + +Configures the Pubsub seen messages cache implementation. + +The Pubsub seen messages cache is a LRU cache that keeps messages for up to a +specified time duration. After this duration has elapsed, expired messages will +be purged from the cache. + +The last-seen cache is a sliding-window cache. Every time a message is seen +again with the SeenMessagesTTL duration, its timestamp slides forward. This +keeps frequently occurring messages cached and prevents them from being +continually propagated, especially because of issues that might increase the +number of duplicate messages in the network. + +The first-seen cache will store new messages and purge them after the +SeenMessagesTTL duration, even if they are seen multiple times within this +duration. + +Default: see `Strategy_LastSeen` from [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub) + +Type: `optionalInteger` + ## `Peering` Configures the peering subsystem. The peering subsystem configures Kubo to From beb821e735d83a98d12269bb6626484672229634 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 24 Jan 2023 16:47:51 -0500 Subject: [PATCH 12/15] fix: config SeenMessagesStrategy using string --- config/pubsub.go | 2 +- core/node/groups.go | 17 ++++++++++++++++- docs/config.md | 4 ++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/config/pubsub.go b/config/pubsub.go index d284e3a4956..4602653b4cc 100644 --- a/config/pubsub.go +++ b/config/pubsub.go @@ -18,5 +18,5 @@ type PubsubConfig struct { // SeenMessagesStrategy configures the cache implementation for previously // seen messages. - SeenMessagesStrategy *OptionalInteger `json:",omitempty"` + SeenMessagesStrategy *OptionalString `json:",omitempty"` } diff --git a/core/node/groups.go b/core/node/groups.go index 1042e416bc4..6ce9b9de633 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -65,9 +65,24 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { pubsubOptions, pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning), pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), - pubsub.WithSeenMessagesStrategy(timecache.Strategy(cfg.Pubsub.SeenMessagesStrategy.WithDefault(int64(timecache.Strategy_LastSeen)))), ) + seenMessagesStrategy := timecache.Strategy_LastSeen + if cfg.Pubsub.SeenMessagesStrategy != nil { + configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.String() + switch configSeenMessagesStrategy { + case "": + fallthrough + case "last-seen": + seenMessagesStrategy = timecache.Strategy_FirstSeen + case "first-seen": + seenMessagesStrategy = timecache.Strategy_FirstSeen + default: + return fx.Error(fmt.Errorf("unknown pubsub seen messages strategy %s", configSeenMessagesStrategy)) + } + } + pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy)) + switch cfg.Pubsub.Router { case "": fallthrough diff --git a/docs/config.md b/docs/config.md index 190623731eb..4591c442314 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1242,9 +1242,9 @@ The first-seen cache will store new messages and purge them after the SeenMessagesTTL duration, even if they are seen multiple times within this duration. -Default: see `Strategy_LastSeen` from [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub) +Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub)) -Type: `optionalInteger` +Type: `optionalString` ## `Peering` From 217369286cb784e4c6c5a326f48ae3dac501d037 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 24 Jan 2023 16:52:24 -0500 Subject: [PATCH 13/15] fix: editorial --- core/node/groups.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/node/groups.go b/core/node/groups.go index 6ce9b9de633..9b08f7b4d68 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -67,19 +67,17 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), ) - seenMessagesStrategy := timecache.Strategy_LastSeen - if cfg.Pubsub.SeenMessagesStrategy != nil { - configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.String() - switch configSeenMessagesStrategy { - case "": - fallthrough - case "last-seen": - seenMessagesStrategy = timecache.Strategy_FirstSeen - case "first-seen": - seenMessagesStrategy = timecache.Strategy_FirstSeen - default: - return fx.Error(fmt.Errorf("unknown pubsub seen messages strategy %s", configSeenMessagesStrategy)) - } + var seenMessagesStrategy timecache.Strategy + configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault("last-seen") + switch configSeenMessagesStrategy { + case "": + fallthrough + case "last-seen": + seenMessagesStrategy = timecache.Strategy_LastSeen + case "first-seen": + seenMessagesStrategy = timecache.Strategy_FirstSeen + default: + return fx.Error(fmt.Errorf("unknown pubsub seen messages strategy %s", configSeenMessagesStrategy)) } pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy)) From 4d9522996d95f23709083195df72c2ab2db3d111 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 26 Jan 2023 23:38:24 +0100 Subject: [PATCH 14/15] docs: Pubsub.SeenMessagesStrategy --- config/pubsub.go | 27 +++++++++++++++++++++++---- core/node/groups.go | 8 +++----- docs/changelogs/v0.18.md | 38 ++++++++++++++++++++++++++++++++++++++ docs/config.md | 11 ++++++----- 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/config/pubsub.go b/config/pubsub.go index 4602653b4cc..36f9a9881d5 100644 --- a/config/pubsub.go +++ b/config/pubsub.go @@ -1,5 +1,24 @@ package config +const ( + // LastSeenMessagesStrategy is a strategy that calculates the TTL countdown + // based on the last time a Pubsub message is seen. This means that if a message + // is received and then seen again within the specified TTL window, it + // won't be emitted until the TTL countdown expires from the last time the + // message was seen. + LastSeenMessagesStrategy = "last-seen" + + // FirstSeenMessagesStrategy is a strategy that calculates the TTL + // countdown based on the first time a Pubsub message is seen. This means that if + // a message is received and then seen again within the specified TTL + // window, it won't be emitted. + FirstSeenMessagesStrategy = "first-seen" + + // DefaultSeenMessagesStrategy is the strategy that is used by default if + // no Pubsub.SeenMessagesStrategy is specified. + DefaultSeenMessagesStrategy = LastSeenMessagesStrategy +) + type PubsubConfig struct { // Router can be either floodsub (legacy) or gossipsub (new and // backwards compatible). @@ -12,11 +31,11 @@ type PubsubConfig struct { // Enable pubsub (--enable-pubsub-experiment) Enabled Flag `json:",omitempty"` - // SeenMessagesTTL configures the duration after which a previously seen - // message ID can be forgotten about. + // SeenMessagesTTL is a value that controls the time window within which + // duplicate messages will be identified and won't be emitted. SeenMessagesTTL *OptionalDuration `json:",omitempty"` - // SeenMessagesStrategy configures the cache implementation for previously - // seen messages. + // SeenMessagesStrategy is a setting that determines how the time-to-live + // (TTL) countdown for deduplicating messages is calculated. SeenMessagesStrategy *OptionalString `json:",omitempty"` } diff --git a/core/node/groups.go b/core/node/groups.go index 9b08f7b4d68..997c47ef9c9 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -68,13 +68,11 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { ) var seenMessagesStrategy timecache.Strategy - configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault("last-seen") + configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy) switch configSeenMessagesStrategy { - case "": - fallthrough - case "last-seen": + case config.LastSeenMessagesStrategy: seenMessagesStrategy = timecache.Strategy_LastSeen - case "first-seen": + case config.FirstSeenMessagesStrategy: seenMessagesStrategy = timecache.Strategy_FirstSeen default: return fx.Error(fmt.Errorf("unknown pubsub seen messages strategy %s", configSeenMessagesStrategy)) diff --git a/docs/changelogs/v0.18.md b/docs/changelogs/v0.18.md index d10f56d7704..0b37c93b4f5 100644 --- a/docs/changelogs/v0.18.md +++ b/docs/changelogs/v0.18.md @@ -1,5 +1,43 @@ # Kubo changelog v0.18 +## v0.18.1 + +This release includes improvements around Pubsub message deduplication, and more. + + + + +- [Overview](#overview) +- [๐Ÿ”ฆ Highlights](#-highlights) + - [New default Pubsub.SeenMessagesStrategy](#new-default-pubsubseenmessagesstrategy) +- [๐Ÿ“ Changelog](#-changelog) +- [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) + + + +### ๐Ÿ”ฆ Highlights + +#### New default `Pubsub.SeenMessagesStrategy` + +A new optional [`Pubsub.SeenMessagesStrategy`](../config.md#pubsubseenmessagesstrategy) configuration option has been added. + +This option allows you to choose between two different strategies for +deduplicating messages: `first-seen` and `last-seen`. + +When unset, the default strategy is `last-seen`, which calculates the +time-to-live (TTL) countdown based on the last time a message is seen. This +means that if a message is received and then seen again within the specified +TTL window based on the last time it was seen, it won't be emitted. + +If you prefer the old behavior, which calculates the TTL countdown based on the +first time a message is seen, you can set `Pubsub.SeenMessagesStrategy` to +`first-seen`. + +### ๐Ÿ“ Changelog + +### ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors + + ## v0.18.0 ### Overview diff --git a/docs/config.md b/docs/config.md index 4591c442314..94d8094af4b 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1207,8 +1207,8 @@ Type: `bool` ### `Pubsub.SeenMessagesTTL` -Configures the duration after which a previously seen Pubsub Message ID can be -forgotten about. +Controls the time window within which duplicate messages, identified by Message +ID, will be identified and won't be emitted again. A smaller value for this parameter means that Pubsub messages in the cache will be garbage collected sooner, which can result in a smaller cache. At the same @@ -1226,19 +1226,20 @@ Type: `optionalDuration` ### `Pubsub.SeenMessagesStrategy` -Configures the Pubsub seen messages cache implementation. +Determines how the time-to-live (TTL) countdown for deduplicating Pubsub +messages is calculated. The Pubsub seen messages cache is a LRU cache that keeps messages for up to a specified time duration. After this duration has elapsed, expired messages will be purged from the cache. -The last-seen cache is a sliding-window cache. Every time a message is seen +The `last-seen` cache is a sliding-window cache. Every time a message is seen again with the SeenMessagesTTL duration, its timestamp slides forward. This keeps frequently occurring messages cached and prevents them from being continually propagated, especially because of issues that might increase the number of duplicate messages in the network. -The first-seen cache will store new messages and purge them after the +The `first-seen` cache will store new messages and purge them after the SeenMessagesTTL duration, even if they are seen multiple times within this duration. From edcbb922549544cb91fb2071e59a9c6ba16e7d68 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 26 Jan 2023 23:57:37 +0100 Subject: [PATCH 15/15] fix: user-friendly SeenMessagesStrategy error --- core/node/groups.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/groups.go b/core/node/groups.go index 997c47ef9c9..e640feff1ab 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -75,7 +75,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { case config.FirstSeenMessagesStrategy: seenMessagesStrategy = timecache.Strategy_FirstSeen default: - return fx.Error(fmt.Errorf("unknown pubsub seen messages strategy %s", configSeenMessagesStrategy)) + return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy)) } pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy))