From 61b8f4d78bb79a1031cb5641c1a8f4f63d5ad32b Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 8 Aug 2024 16:16:26 -0400 Subject: [PATCH 1/7] feat: gossipsub 1.2: IDONTWANT --- package-lock.json | 9 --- src/constants.ts | 10 +++ src/index.ts | 115 ++++++++++++++++++++++++++++++- src/message/decodeRpc.ts | 2 + src/message/rpc.proto | 6 ++ src/message/rpc.ts | 119 ++++++++++++++++++++++++++++++--- src/metrics.ts | 20 +++++- src/utils/create-gossip-rpc.ts | 6 +- test/e2e/go-gossipsub.spec.ts | 6 +- test/gossip.spec.ts | 2 +- 10 files changed, 268 insertions(+), 27 deletions(-) diff --git a/package-lock.json b/package-lock.json index 353e411d..9afbc1ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5968,15 +5968,6 @@ "ajv": ">=5.0.0" } }, - "node_modules/ansi-colors": { - "version": "4.1.3", - "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz", - "integrity": "sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==", - "extraneous": true, - "engines": { - "node": ">=6" - } - }, "node_modules/ansi-escapes": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/ansi-escapes/-/ansi-escapes-3.2.0.tgz", diff --git a/src/constants.ts b/src/constants.ts index cff4a16c..f8f6bac5 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -18,6 +18,13 @@ export const GossipsubIDv10 = '/meshsub/1.0.0' */ export const GossipsubIDv11 = '/meshsub/1.1.0' +/** + * The protocol ID for version 1.2.0 of the Gossipsub protocol + * See the spec for details about how v1.2.0 compares to v1.1.0: + * https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md + */ +export const GossipsubIDv12 = '/meshsub/1.2.0' + // Overlay parameters /** @@ -249,3 +256,6 @@ export const DEFAULT_METRIC_MESH_MESSAGE_DELIVERIES_WINDOWS = 1000 /** Wait for 1 more heartbeats before clearing a backoff */ export const BACKOFF_SLACK = 1 + +export const GossipsubIdontwantMinDataSize = 512 +export const GossipsubIdontwantMaxMessages = 512 diff --git a/src/index.ts b/src/index.ts index 682a8be8..395deb62 100644 --- a/src/index.ts +++ b/src/index.ts @@ -86,7 +86,7 @@ type ReceivedMessageResult = | ({ code: MessageStatus.invalid, msgIdStr?: MsgIdStr } & RejectReasonObj) | { code: MessageStatus.valid, messageId: MessageId, msg: Message } -export const multicodec: string = constants.GossipsubIDv11 +export const multicodec: string = constants.GossipsubIDv12 export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { /** if dial should fallback to floodsub */ @@ -201,6 +201,20 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { * If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to true */ tagMeshPeers: boolean + + /** + * The minimum message size in bytes to be considered for sending IDONTWANT messages + * + * @default 512 + */ + idontwantMinDataSize?: number + + /** + * The maximum number of IDONTWANT messages per heartbeat per peer + * + * @default 512 + */ + idontwantMaxMessages?: number } export interface GossipsubMessage { @@ -263,7 +277,7 @@ export class GossipSub extends TypedEventEmitter implements Pub * The signature policy to follow by default */ public readonly globalSignaturePolicy: typeof StrictSign | typeof StrictNoSign - public multicodecs: string[] = [constants.GossipsubIDv11, constants.GossipsubIDv10] + public multicodecs: string[] = [constants.GossipsubIDv12, constants.GossipsubIDv11, constants.GossipsubIDv10] private publishConfig: PublishConfig | undefined @@ -398,11 +412,21 @@ export class GossipSub extends TypedEventEmitter implements Pub */ readonly gossipTracer: IWantTracer + /** + * Tracks IDONTWANT messages received by peers in the current heartbeat + */ + private readonly idontwantCounts = new Map() + + /** + * Tracks IDONTWANT messages received by peers and the heartbeat they were received in + */ + private readonly idontwants = new Map>() + private readonly components: GossipSubComponents private directPeerInitial: ReturnType | null = null - public static multicodec: string = constants.GossipsubIDv11 + public static multicodec: string = constants.GossipsubIDv12 // Options readonly opts: Required @@ -450,6 +474,8 @@ export class GossipSub extends TypedEventEmitter implements Pub opportunisticGraftPeers: constants.GossipsubOpportunisticGraftPeers, opportunisticGraftTicks: constants.GossipsubOpportunisticGraftTicks, directConnectTicks: constants.GossipsubDirectConnectTicks, + idontwantMinDataSize: constants.GossipsubIdontwantMinDataSize, + idontwantMaxMessages: constants.GossipsubIdontwantMaxMessages, ...options, scoreParams: createPeerScoreParams(options.scoreParams), scoreThresholds: createPeerScoreThresholds(options.scoreThresholds) @@ -738,6 +764,8 @@ export class GossipSub extends TypedEventEmitter implements Pub this.seenCache.clear() if (this.fastMsgIdCache != null) this.fastMsgIdCache.clear() if (this.directPeerInitial != null) clearTimeout(this.directPeerInitial) + this.idontwantCounts.clear() + this.idontwants.clear() this.log('stopped') } @@ -944,6 +972,9 @@ export class GossipSub extends TypedEventEmitter implements Pub this.control.delete(id) // Remove from backoff mapping this.outbound.delete(id) + // Remove from idontwant tracking + this.idontwantCounts.delete(id) + this.idontwants.delete(id) // Remove from peer scoring this.score.removePeer(id) @@ -1007,6 +1038,10 @@ export class GossipSub extends TypedEventEmitter implements Pub prune: this.decodeRpcLimits.maxControlMessages, prune$: { peers: this.decodeRpcLimits.maxPeerInfos + }, + idontwant: this.decodeRpcLimits.maxControlMessages, + idontwant$: { + messageIDs: this.decodeRpcLimits.maxIdontwantMessageIDs } } } @@ -1298,6 +1333,11 @@ export class GossipSub extends TypedEventEmitter implements Pub this.seenCache.put(msgIdStr) } + // possibly send IDONTWANTs to mesh peers + if ((rpcMsg.data?.length ?? 0) >= this.opts.idontwantMinDataSize) { + this.sendIDontWants(msgId, rpcMsg.topic, propagationSource.toString()) + } + // (Optional) Provide custom validation here with dynamic validators per topic // NOTE: This custom topicValidator() must resolve fast (< 100ms) to allow scores // to not penalize peers for long validation times. @@ -1351,6 +1391,7 @@ export class GossipSub extends TypedEventEmitter implements Pub const ihave = (controlMsg.iwant != null) ? this.handleIWant(id, controlMsg.iwant) : [] const prune = (controlMsg.graft != null) ? await this.handleGraft(id, controlMsg.graft) : [] ;(controlMsg.prune != null) && (await this.handlePrune(id, controlMsg.prune)) + ;(controlMsg.idontwant != null) && this.handleIdontwant(id, controlMsg.idontwant) if ((iwant.length === 0) && (ihave.length === 0) && (prune.length === 0)) { return @@ -1679,6 +1720,41 @@ export class GossipSub extends TypedEventEmitter implements Pub } } + private handleIdontwant (id: PeerIdStr, idontwant: RPC.ControlIDontWant[]): void { + let idontwantCount = this.idontwantCounts.get(id) ?? 0 + // return early if we have already received too many IDONTWANT messages from the peer + if (idontwantCount >= this.opts.idontwantMaxMessages) { + return + } + const startIdontwantCount = idontwantCount + + let idontwants = this.idontwants.get(id) + if (idontwants == null) { + idontwants = new Map() + this.idontwants.set(id, idontwants) + } + let idonthave = 0 + // eslint-disable-next-line no-labels + out: for (const { messageIDs } of idontwant) { + for (const msgId of messageIDs) { + if (idontwantCount >= this.opts.idontwantMaxMessages) { + // eslint-disable-next-line no-labels + break out + } + idontwantCount++ + + const msgIdStr = this.msgIdToStrFn(msgId) + idontwants.set(msgIdStr, this.heartbeatTicks) + if (!this.mcache.msgs.has(msgIdStr)) idonthave++ + } + } + // delay setting the count in case there are multiple IDONTWANT messages + // only set once + this.idontwantCounts.set(id, idontwantCount) + const total = idontwantCount - startIdontwantCount + this.metrics?.onIdontwantRcv(total, idonthave) + } + /** * Add standard backoff log for a peer in a topic */ @@ -2341,6 +2417,27 @@ export class GossipSub extends TypedEventEmitter implements Pub this.sendRpc(id, out) } + private sendIDontWants (msgId: Uint8Array, topic: string, source: PeerIdStr): void { + const ids = this.mesh.get(topic) + if (ids == null) { + return + } + + // don't send IDONTWANT to: + // - the source + // - peers that don't support v1.2 + const tosend = new Set(ids) + tosend.delete(source) + for (const id of tosend) { + if (this.streamsOutbound.get(id)?.protocol !== constants.GossipsubIDv12) { + tosend.delete(id) + } + } + + const idontwantRpc = createGossipRpc([], { idontwant: [{ messageIDs: [msgId] }] }) + this.sendRpcInBatch(tosend, idontwantRpc) + } + /** * Send an rpc object to a peer */ @@ -2688,6 +2785,18 @@ export class GossipSub extends TypedEventEmitter implements Pub // apply IWANT request penalties this.applyIwantPenalties() + // clean up IDONTWANT counters + this.idontwantCounts.clear() + + // clean up old tracked IDONTWANTs + for (const idontwants of this.idontwants.values()) { + for (const [msgId, heartbeatTick] of idontwants) { + if (this.heartbeatTicks - heartbeatTick >= this.opts.mcacheLength) { + idontwants.delete(msgId) + } + } + } + // ensure direct peers are connected if (this.heartbeatTicks % this.opts.directConnectTicks === 0) { // we only do this every few ticks to allow pending connections to complete and account for restarts/downtime diff --git a/src/message/decodeRpc.ts b/src/message/decodeRpc.ts index dc9579c6..7b9ecd99 100644 --- a/src/message/decodeRpc.ts +++ b/src/message/decodeRpc.ts @@ -3,6 +3,7 @@ export interface DecodeRPCLimits { maxMessages: number maxIhaveMessageIDs: number maxIwantMessageIDs: number + maxIdontwantMessageIDs: number maxControlMessages: number maxPeerInfos: number } @@ -12,6 +13,7 @@ export const defaultDecodeRpcLimits: DecodeRPCLimits = { maxMessages: Infinity, maxIhaveMessageIDs: Infinity, maxIwantMessageIDs: Infinity, + maxIdontwantMessageIDs: Infinity, maxControlMessages: Infinity, maxPeerInfos: Infinity } diff --git a/src/message/rpc.proto b/src/message/rpc.proto index daefc676..efe8fafe 100644 --- a/src/message/rpc.proto +++ b/src/message/rpc.proto @@ -24,6 +24,7 @@ message RPC { repeated ControlIWant iwant = 2; repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; + repeated ControlIDontWant idontwant = 5; } message ControlIHave { @@ -49,4 +50,9 @@ message RPC { optional bytes peerID = 1; optional bytes signedPeerRecord = 2; } + + message ControlIDontWant { + repeated bytes messageIDs = 1; + } + } \ No newline at end of file diff --git a/src/message/rpc.ts b/src/message/rpc.ts index a31ab91a..71bd6527 100644 --- a/src/message/rpc.ts +++ b/src/message/rpc.ts @@ -197,6 +197,7 @@ export namespace RPC { iwant: RPC.ControlIWant[] graft: RPC.ControlGraft[] prune: RPC.ControlPrune[] + idontwant: RPC.ControlIDontWant[] } export namespace ControlMessage { @@ -237,6 +238,13 @@ export namespace RPC { } } + if (obj.idontwant != null) { + for (const value of obj.idontwant) { + w.uint32(42) + RPC.ControlIDontWant.codec().encode(value, w) + } + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -245,7 +253,8 @@ export namespace RPC { ihave: [], iwant: [], graft: [], - prune: [] + prune: [], + idontwant: [] } const end = length == null ? reader.len : reader.pos + length @@ -259,7 +268,9 @@ export namespace RPC { throw new CodeError('decode error - map field "ihave" had too many elements', 'ERR_MAX_LENGTH') } - obj.ihave.push(RPC.ControlIHave.codec().decode(reader, reader.uint32())) + obj.ihave.push(RPC.ControlIHave.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.ihave$ + })) break } case 2: { @@ -267,7 +278,9 @@ export namespace RPC { throw new CodeError('decode error - map field "iwant" had too many elements', 'ERR_MAX_LENGTH') } - obj.iwant.push(RPC.ControlIWant.codec().decode(reader, reader.uint32())) + obj.iwant.push(RPC.ControlIWant.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.iwant$ + })) break } case 3: { @@ -275,7 +288,9 @@ export namespace RPC { throw new CodeError('decode error - map field "graft" had too many elements', 'ERR_MAX_LENGTH') } - obj.graft.push(RPC.ControlGraft.codec().decode(reader, reader.uint32())) + obj.graft.push(RPC.ControlGraft.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.graft$ + })) break } case 4: { @@ -283,7 +298,19 @@ export namespace RPC { throw new CodeError('decode error - map field "prune" had too many elements', 'ERR_MAX_LENGTH') } - obj.prune.push(RPC.ControlPrune.codec().decode(reader, reader.uint32())) + obj.prune.push(RPC.ControlPrune.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.prune$ + })) + break + } + case 5: { + if (opts.limits?.idontwant != null && obj.idontwant.length === opts.limits.idontwant) { + throw new CodeError('decode error - map field "idontwant" had too many elements', 'ERR_MAX_LENGTH') + } + + obj.idontwant.push(RPC.ControlIDontWant.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.idontwant$ + })) break } default: { @@ -565,7 +592,9 @@ export namespace RPC { throw new CodeError('decode error - map field "peers" had too many elements', 'ERR_MAX_LENGTH') } - obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32())) + obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.peers$ + })) break } case 3: { @@ -663,6 +692,72 @@ export namespace RPC { } } + export interface ControlIDontWant { + messageIDs: Uint8Array[] + } + + export namespace ControlIDontWant { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.messageIDs != null) { + for (const value of obj.messageIDs) { + w.uint32(10) + w.bytes(value) + } + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + messageIDs: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + if (opts.limits?.messageIDs != null && obj.messageIDs.length === opts.limits.messageIDs) { + throw new CodeError('decode error - map field "messageIDs" had too many elements', 'ERR_MAX_LENGTH') + } + + obj.messageIDs.push(reader.bytes()) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, ControlIDontWant.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): ControlIDontWant => { + return decodeMessage(buf, ControlIDontWant.codec(), opts) + } + } + let _codec: Codec export const codec = (): Codec => { @@ -711,7 +806,9 @@ export namespace RPC { throw new CodeError('decode error - map field "subscriptions" had too many elements', 'ERR_MAX_LENGTH') } - obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32())) + obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.subscriptions$ + })) break } case 2: { @@ -719,11 +816,15 @@ export namespace RPC { throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH') } - obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32())) + obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.messages$ + })) break } case 3: { - obj.control = RPC.ControlMessage.codec().decode(reader, reader.uint32()) + obj.control = RPC.ControlMessage.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.control + }) break } default: { diff --git a/src/metrics.ts b/src/metrics.ts index d098f03a..4353b516 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -361,6 +361,7 @@ export function getMetrics ( rpcSentIWant: register.gauge({ name: 'gossipsub_rpc_sent_iwant_total', help: 'RPC sent' }), rpcSentGraft: register.gauge({ name: 'gossipsub_rpc_sent_graft_total', help: 'RPC sent' }), rpcSentPrune: register.gauge({ name: 'gossipsub_rpc_sent_prune_total', help: 'RPC sent' }), + rpcSentIDontWant: register.gauge({ name: 'gossipsub_rpc_sent_idontwant_total', help: 'RPC sent' }), // publish message. Track peers sent to and bytes /** Total count of msg published by topic */ @@ -592,6 +593,16 @@ export function getMetrics ( name: 'gossipsub_iwant_rcv_dont_have_msgids_total', help: 'Total requested messageIDs that we do not have' }), + /** Total received IDONTWANT messages */ + idontwantRcvMsgids: register.gauge({ + name: 'gossipsub_idontwant_rcv_msgids_total', + help: 'Total received IDONTWANT messages' + }), + /** Total requested messageIDs that we don't have */ + idontwantRcvDonthaveMsgids: register.gauge({ + name: 'gossipsub_idontwant_rcv_dont_have_msgids_total', + help: 'Total requested IDONTWANT messageIDs that we do not have in mcache' + }), iwantPromiseStarted: register.gauge({ name: 'gossipsub_iwant_promise_sent_total', help: 'Total count of started IWANT promises' @@ -808,6 +819,11 @@ export function getMetrics ( this.iwantRcvDonthaveMsgids.inc(iwantDonthave) }, + onIdontwantRcv (idontwant: number, idontwantDonthave: number): void { + this.idontwantRcvMsgids.inc(idontwant) + this.idontwantRcvDonthaveMsgids.inc(idontwantDonthave) + }, + onForwardMsg (topicStr: TopicStr, tosendCount: number): void { const topic = this.toTopic(topicStr) this.msgForwardCount.inc({ topic }, 1) @@ -917,11 +933,13 @@ export function getMetrics ( const iwant = rpc.control.iwant?.length ?? 0 const graft = rpc.control.graft?.length ?? 0 const prune = rpc.control.prune?.length ?? 0 + const idontwant = rpc.control.idontwant?.length ?? 0 if (ihave > 0) this.rpcSentIHave.inc(ihave) if (iwant > 0) this.rpcSentIWant.inc(iwant) if (graft > 0) this.rpcSentGraft.inc(graft) if (prune > 0) this.rpcSentPrune.inc(prune) - if (ihave > 0 || iwant > 0 || graft > 0 || prune > 0) this.rpcSentControl.inc(1) + if (idontwant > 0) this.rpcSentIDontWant.inc(idontwant) + if (ihave > 0 || iwant > 0 || graft > 0 || prune > 0 || idontwant > 0) this.rpcSentControl.inc(1) } }, diff --git a/src/utils/create-gossip-rpc.ts b/src/utils/create-gossip-rpc.ts index 9ba7891e..efd59fcb 100644 --- a/src/utils/create-gossip-rpc.ts +++ b/src/utils/create-gossip-rpc.ts @@ -12,7 +12,8 @@ export function createGossipRpc (messages: RPC.Message[] = [], control?: Partial graft: control.graft ?? [], prune: control.prune ?? [], ihave: control.ihave ?? [], - iwant: control.iwant ?? [] + iwant: control.iwant ?? [], + idontwant: control.idontwant ?? [] } : undefined } @@ -24,7 +25,8 @@ export function ensureControl (rpc: RPC): Required { graft: [], prune: [], ihave: [], - iwant: [] + iwant: [], + idontwant: [] } } diff --git a/test/e2e/go-gossipsub.spec.ts b/test/e2e/go-gossipsub.spec.ts index ebb81e87..190b6c4c 100644 --- a/test/e2e/go-gossipsub.spec.ts +++ b/test/e2e/go-gossipsub.spec.ts @@ -1201,7 +1201,8 @@ describe('go-libp2p-pubsub gossipsub tests', function () { graft: [toGraft(topic1), toGraft(topic2), toGraft(topic3)], prune: [toPrune(topic1), toPrune(topic2), toPrune(topic3)], ihave: [], - iwant: [] + iwant: [], + idontwant: [] }) const expectedRpc: RPC = { @@ -1211,7 +1212,8 @@ describe('go-libp2p-pubsub gossipsub tests', function () { graft: [toGraft(topic1)], prune: [toPrune(topic2), toPrune(topic3)], ihave: [], - iwant: [] + iwant: [], + idontwant: [] } } diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index 6c4cfeb7..37fae14f 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -244,7 +244,7 @@ describe('gossip', () => { // set spy. NOTE: Forcing private property to be public const nodeASpy = sinon.spy(nodeA.pubsub, 'piggybackControl') // manually add control message to be sent to peerB - const graft = { ihave: [], iwant: [], graft: [{ topicID: topic }], prune: [] } + const graft = { ihave: [], iwant: [], graft: [{ topicID: topic }], prune: [], idontwant: [] } ;(nodeA.pubsub).control.set(peerB, graft) ;(nodeA.pubsub).gossip.set(peerB, []) From 78d3c50aabb3ce4834b87a60b3b5ab372d8fdc2c Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 12 Sep 2024 16:46:57 -0400 Subject: [PATCH 2/7] chore: add unit test --- src/index.ts | 12 +++--- test/gossip.spec.ts | 102 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 9 deletions(-) diff --git a/src/index.ts b/src/index.ts index aeaf166d..1302ec99 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1399,11 +1399,11 @@ export class GossipSub extends TypedEventEmitter implements Pub return } - const iwant = (controlMsg.ihave != null) ? this.handleIHave(id, controlMsg.ihave) : [] - const ihave = (controlMsg.iwant != null) ? this.handleIWant(id, controlMsg.iwant) : [] - const prune = (controlMsg.graft != null) ? await this.handleGraft(id, controlMsg.graft) : [] - ;(controlMsg.prune != null) && (await this.handlePrune(id, controlMsg.prune)) - ;(controlMsg.idontwant != null) && this.handleIdontwant(id, controlMsg.idontwant) + const iwant = (controlMsg.ihave?.length > 0) ? this.handleIHave(id, controlMsg.ihave) : [] + const ihave = (controlMsg.iwant?.length > 0) ? this.handleIWant(id, controlMsg.iwant) : [] + const prune = (controlMsg.graft?.length > 0) ? await this.handleGraft(id, controlMsg.graft) : [] + ;(controlMsg.prune?.length > 0) && (await this.handlePrune(id, controlMsg.prune)) + ;(controlMsg.idontwant?.length > 0) && this.handleIdontwant(id, controlMsg.idontwant) if ((iwant.length === 0) && (ihave.length === 0) && (prune.length === 0)) { return @@ -1760,8 +1760,6 @@ export class GossipSub extends TypedEventEmitter implements Pub if (!this.mcache.msgs.has(msgIdStr)) idonthave++ } } - // delay setting the count in case there are multiple IDONTWANT messages - // only set once this.idontwantCounts.set(id, idontwantCount) const total = idontwantCount - startIdontwantCount this.metrics?.onIdontwantRcv(total, idonthave) diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index 9494450b..c0e6d755 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -7,8 +7,9 @@ import { expect } from 'aegir/chai' import { pEvent } from 'p-event' import sinon, { type SinonStubbedInstance } from 'sinon' import { stubInterface } from 'sinon-ts' +import { concat } from 'uint8arrays' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { GossipsubDhi } from '../src/constants.js' +import { GossipsubDhi} from '../src/constants.js' import { GossipSub } from '../src/index.js' import { connectAllPubSubNodes, createComponentsArray, type GossipSubAndComponents } from './utils/create-pubsub.js' import type { PeerStore } from '@libp2p/interface' @@ -28,7 +29,8 @@ describe('gossip', () => { IPColocationFactorThreshold: GossipsubDhi + 3 }, maxInboundDataLength: 4000000, - allowPublishToZeroTopicPeers: false + allowPublishToZeroTopicPeers: false, + idontwantMaxMessages: 10 } }) }) @@ -84,6 +86,102 @@ describe('gossip', () => { nodeASpy.pushGossip.restore() }) + it('should send idontwant to peers in topic', async function () { + // This test checks that idontwants and idontwantsCounts are correctly incrmemented + // - idontwantCounts should track the number of idontwant messages received from a peer for a single heartbeat + // - it should increment on receive of idontwant msgs (up to limit) + // - it should be emptied after heartbeat + // - idontwants should track the idontwant messages received from a peer along with the heartbeatId when received + // - it should increment on receive of idontwant msgs (up to limit) + // - it should be emptied after mcacheLength heartbeats + this.timeout(10e4) + const nodeA = nodes[0] + const otherNodes = nodes.slice(1) + const topic = 'Z' + const idontwantMaxMessages = nodeA.pubsub.opts.idontwantMaxMessages + const idontwantMinDataSize = nodeA.pubsub.opts.idontwantMinDataSize + + const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) + // add subscriptions to each node + nodes.forEach((n) => { n.pubsub.subscribe(topic) }) + + // every node connected to every other + await connectAllPubSubNodes(nodes) + + // wait for subscriptions to be transmitted + await Promise.all(subscriptionPromises) + + // await mesh rebalancing + await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) + + // publish a bunch of messages, enough to fill up our idontwant caches + for (let i = 0; i < idontwantMaxMessages * 2; i++) { + const msg = concat([ + uint8ArrayFromString(i.toString()), + new Uint8Array(idontwantMinDataSize) + ]) + await nodeA.pubsub.publish(topic, msg) + } + + // there's no event currently implemented to await, so just wait a bit - flaky :( + // TODO figure out something more robust + await new Promise((resolve) => setTimeout(resolve, 500)) + + // other nodes should have received idontwant messages + // check that idontwants <= GossipsubIdontwantMaxMessages + for (const node of otherNodes) { + // eslint-disable-next-line @typescript-eslint/dot-notation + const idontwantCounts = node.pubsub['idontwantCounts'] + let minCount = Infinity + let maxCount = 0 + for (const count of idontwantCounts.values()) { + minCount = Math.min(minCount, count) + maxCount = Math.max(maxCount, count) + } + // expect(minCount).to.be.greaterThan(0) + expect(maxCount).to.be.lessThanOrEqual(idontwantMaxMessages) + + // eslint-disable-next-line @typescript-eslint/dot-notation + const idontwants = node.pubsub['idontwants'] + let minIdontwants = Infinity + let maxIdontwants = 0 + for (const idontwant of idontwants.values()) { + minIdontwants = Math.min(minIdontwants, idontwant.size) + maxIdontwants = Math.max(maxIdontwants, idontwant.size) + } + // expect(minIdontwants).to.be.greaterThan(0) + expect(maxIdontwants).to.be.lessThanOrEqual(idontwantMaxMessages) + + // sanity check that the idontwantCount matches idontwants.size + expect(minCount).to.be.equal(minIdontwants) + expect(maxCount).to.be.equal(maxIdontwants) + } + + await Promise.all(otherNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) + + // after a heartbeat + // idontwants are still tracked + // but idontwantCounts have been cleared + for (const node of nodes) { + // eslint-disable-next-line @typescript-eslint/dot-notation + const idontwantCounts = node.pubsub['idontwantCounts'] + for (const count of idontwantCounts.values()) { + expect(count).to.be.equal(0) + } + + // eslint-disable-next-line @typescript-eslint/dot-notation + const idontwants = node.pubsub['idontwants'] + let minIdontwants = Infinity + let maxIdontwants = 0 + for (const idontwant of idontwants.values()) { + minIdontwants = Math.min(minIdontwants, idontwant.size) + maxIdontwants = Math.max(maxIdontwants, idontwant.size) + } + // expect(minIdontwants).to.be.greaterThan(0) + expect(maxIdontwants).to.be.lessThanOrEqual(idontwantMaxMessages) + } + }) + it('Should allow publishing to zero peers if flag is passed', async function () { this.timeout(10e4) const nodeA = nodes[0] From ff3c8e599036842cbfceb90ff937d6e5102ac899 Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 12 Sep 2024 16:48:57 -0400 Subject: [PATCH 3/7] chore: remove packageManager from package.json --- package.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/package.json b/package.json index 24442659..9ba8d69f 100644 --- a/package.json +++ b/package.json @@ -140,6 +140,5 @@ "Franck Royer ", "ChainSafe " ], - "sideEffects": false, - "packageManager": "yarn@1.22.22+sha256.c17d3797fb9a9115bf375e31bfd30058cac6bc9c3b8807a3d8cb2094794b51ca" + "sideEffects": false } From 425b6c370b3889c27fe77c80fa2a3312a9cc07d6 Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 12 Sep 2024 16:58:05 -0400 Subject: [PATCH 4/7] chore: add idontwants cacheSize metric --- src/index.ts | 9 +++++++++ src/metrics.ts | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1302ec99..d9b0445b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -430,6 +430,9 @@ export class GossipSub extends TypedEventEmitter implements Pub /** * Tracks IDONTWANT messages received by peers and the heartbeat they were received in + * + * idontwants are stored for `mcacheLength` heartbeats before being pruned, + * so this map is bounded by peerCount * idontwantMaxMessages * mcacheLength */ private readonly idontwants = new Map>() @@ -3176,6 +3179,12 @@ export class GossipSub extends TypedEventEmitter implements Pub } metrics.cacheSize.set({ cache: 'backoff' }, backoffSize) + let idontwantsCount = 0 + for (const idontwant of this.idontwants.values()) { + idontwantsCount += idontwant.size + } + metrics.cacheSize.set({ cache: 'idontwants' }, idontwantsCount) + // Peer counts for (const [topicStr, peers] of this.topics) { diff --git a/src/metrics.ts b/src/metrics.ts index 4353b516..d6040755 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -598,10 +598,10 @@ export function getMetrics ( name: 'gossipsub_idontwant_rcv_msgids_total', help: 'Total received IDONTWANT messages' }), - /** Total requested messageIDs that we don't have */ + /** Total received IDONTWANT messageIDs that we don't have */ idontwantRcvDonthaveMsgids: register.gauge({ name: 'gossipsub_idontwant_rcv_dont_have_msgids_total', - help: 'Total requested IDONTWANT messageIDs that we do not have in mcache' + help: 'Total received IDONTWANT messageIDs that we do not have in mcache' }), iwantPromiseStarted: register.gauge({ name: 'gossipsub_iwant_promise_sent_total', From 25cc628562c7f39e9668ecbe2d56400a7fcf2590 Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 12 Sep 2024 17:09:03 -0400 Subject: [PATCH 5/7] chore: fix lint error --- test/gossip.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index c0e6d755..60a7e1f8 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -9,7 +9,7 @@ import sinon, { type SinonStubbedInstance } from 'sinon' import { stubInterface } from 'sinon-ts' import { concat } from 'uint8arrays' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { GossipsubDhi} from '../src/constants.js' +import { GossipsubDhi } from '../src/constants.js' import { GossipSub } from '../src/index.js' import { connectAllPubSubNodes, createComponentsArray, type GossipSubAndComponents } from './utils/create-pubsub.js' import type { PeerStore } from '@libp2p/interface' From 9c00c73491f987f86e63dbeea4b1bd050689b812 Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 13 Sep 2024 10:19:01 -0400 Subject: [PATCH 6/7] chore: make test less flaky --- test/gossip.spec.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index 60a7e1f8..e656f4d1 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -122,14 +122,21 @@ describe('gossip', () => { ]) await nodeA.pubsub.publish(topic, msg) } + // track the heartbeat when each node received the last message + // eslint-disable-next-line @typescript-eslint/dot-notation + const ticks = otherNodes.map((n) => n.pubsub['heartbeatTicks']) // there's no event currently implemented to await, so just wait a bit - flaky :( // TODO figure out something more robust - await new Promise((resolve) => setTimeout(resolve, 500)) + await new Promise((resolve) => setTimeout(resolve, 200)) // other nodes should have received idontwant messages // check that idontwants <= GossipsubIdontwantMaxMessages - for (const node of otherNodes) { + for (let i = 0; i < otherNodes.length; i++) { + const node = otherNodes[i] + // eslint-disable-next-line @typescript-eslint/dot-notation + const currentTick = node.pubsub['heartbeatTicks'] + // eslint-disable-next-line @typescript-eslint/dot-notation const idontwantCounts = node.pubsub['idontwantCounts'] let minCount = Infinity @@ -153,8 +160,11 @@ describe('gossip', () => { expect(maxIdontwants).to.be.lessThanOrEqual(idontwantMaxMessages) // sanity check that the idontwantCount matches idontwants.size - expect(minCount).to.be.equal(minIdontwants) - expect(maxCount).to.be.equal(maxIdontwants) + // only the case if there hasn't been a heartbeat + if (currentTick === ticks[i]) { + expect(minCount).to.be.equal(minIdontwants) + expect(maxCount).to.be.equal(maxIdontwants) + } } await Promise.all(otherNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) From fdc8f651554a7e350d4004c9a853b7fa2fe729dc Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 13 Sep 2024 10:45:54 -0400 Subject: [PATCH 7/7] chore: fix comment --- src/metrics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/metrics.ts b/src/metrics.ts index d6040755..afba2e7b 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -259,7 +259,7 @@ export function getMetrics ( /* General Metrics */ /** - * Gossipsub supports floodsub, gossipsub v1.0 and gossipsub v1.1. Peers are classified based + * Gossipsub supports floodsub, gossipsub v1.0, v1.1, and v1.2. Peers are classified based * on which protocol they support. This metric keeps track of the number of peers that are * connected of each type. */ peersPerProtocol: register.gauge<{ protocol: string }>({