Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gossipsub 1.2: IDONTWANT #498

Merged
merged 8 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 360 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ export const GossipsubIDv10 = '/meshsub/1.0.0'
*/
export const GossipsubIDv11 = '/meshsub/1.1.0'

/**
* The protocol ID for version 1.2.0 of the Gossipsub protocol
* See the spec for details about how v1.2.0 compares to v1.1.0:
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md
*/
export const GossipsubIDv12 = '/meshsub/1.2.0'

// Overlay parameters

/**
Expand Down Expand Up @@ -249,3 +256,6 @@ export const DEFAULT_METRIC_MESH_MESSAGE_DELIVERIES_WINDOWS = 1000

/** Wait for 1 more heartbeats before clearing a backoff */
export const BACKOFF_SLACK = 1

export const GossipsubIdontwantMinDataSize = 512
export const GossipsubIdontwantMaxMessages = 512
130 changes: 123 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type ReceivedMessageResult =
| ({ code: MessageStatus.invalid, msgIdStr?: MsgIdStr } & RejectReasonObj)
| { code: MessageStatus.valid, messageId: MessageId, msg: Message }

export const multicodec: string = constants.GossipsubIDv11
export const multicodec: string = constants.GossipsubIDv12

export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
/** if dial should fallback to floodsub */
Expand Down Expand Up @@ -211,6 +211,20 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
* It should be a number between 0 and 1, with a reasonable default of 0.25
*/
gossipFactor: number

/**
* The minimum message size in bytes to be considered for sending IDONTWANT messages
*
* @default 512
*/
idontwantMinDataSize?: number

/**
* The maximum number of IDONTWANT messages per heartbeat per peer
*
* @default 512
*/
idontwantMaxMessages?: number
}

export interface GossipsubMessage {
Expand Down Expand Up @@ -274,7 +288,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
* The signature policy to follow by default
*/
public readonly globalSignaturePolicy: typeof StrictSign | typeof StrictNoSign
public multicodecs: string[] = [constants.GossipsubIDv11, constants.GossipsubIDv10]
public multicodecs: string[] = [constants.GossipsubIDv12, constants.GossipsubIDv11, constants.GossipsubIDv10]

private publishConfig: PublishConfig | undefined

Expand Down Expand Up @@ -409,11 +423,24 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
*/
readonly gossipTracer: IWantTracer

/**
* Tracks IDONTWANT messages received by peers in the current heartbeat
*/
private readonly idontwantCounts = new Map<PeerIdStr, number>()

/**
* Tracks IDONTWANT messages received by peers and the heartbeat they were received in
*
* idontwants are stored for `mcacheLength` heartbeats before being pruned,
* so this map is bounded by peerCount * idontwantMaxMessages * mcacheLength
*/
private readonly idontwants = new Map<PeerIdStr, Map<MsgIdStr, number>>()

private readonly components: GossipSubComponents

private directPeerInitial: ReturnType<typeof setTimeout> | null = null

public static multicodec: string = constants.GossipsubIDv11
public static multicodec: string = constants.GossipsubIDv12

// Options
readonly opts: Required<GossipOptions>
Expand Down Expand Up @@ -462,6 +489,8 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
opportunisticGraftTicks: constants.GossipsubOpportunisticGraftTicks,
directConnectTicks: constants.GossipsubDirectConnectTicks,
gossipFactor: constants.GossipsubGossipFactor,
idontwantMinDataSize: constants.GossipsubIdontwantMinDataSize,
idontwantMaxMessages: constants.GossipsubIdontwantMaxMessages,
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
Expand Down Expand Up @@ -750,6 +779,8 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.seenCache.clear()
if (this.fastMsgIdCache != null) this.fastMsgIdCache.clear()
if (this.directPeerInitial != null) clearTimeout(this.directPeerInitial)
this.idontwantCounts.clear()
this.idontwants.clear()

this.log('stopped')
}
Expand Down Expand Up @@ -956,6 +987,9 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.control.delete(id)
// Remove from backoff mapping
this.outbound.delete(id)
// Remove from idontwant tracking
this.idontwantCounts.delete(id)
this.idontwants.delete(id)

// Remove from peer scoring
this.score.removePeer(id)
Expand Down Expand Up @@ -1019,6 +1053,10 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
prune: this.decodeRpcLimits.maxControlMessages,
prune$: {
peers: this.decodeRpcLimits.maxPeerInfos
},
idontwant: this.decodeRpcLimits.maxControlMessages,
idontwant$: {
messageIDs: this.decodeRpcLimits.maxIdontwantMessageIDs
}
}
}
Expand Down Expand Up @@ -1310,6 +1348,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.seenCache.put(msgIdStr)
}

// possibly send IDONTWANTs to mesh peers
if ((rpcMsg.data?.length ?? 0) >= this.opts.idontwantMinDataSize) {
this.sendIDontWants(msgId, rpcMsg.topic, propagationSource.toString())
}

// (Optional) Provide custom validation here with dynamic validators per topic
// NOTE: This custom topicValidator() must resolve fast (< 100ms) to allow scores
// to not penalize peers for long validation times.
Expand Down Expand Up @@ -1359,10 +1402,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
return
}

const iwant = (controlMsg.ihave != null) ? this.handleIHave(id, controlMsg.ihave) : []
const ihave = (controlMsg.iwant != null) ? this.handleIWant(id, controlMsg.iwant) : []
const prune = (controlMsg.graft != null) ? await this.handleGraft(id, controlMsg.graft) : []
;(controlMsg.prune != null) && (await this.handlePrune(id, controlMsg.prune))
const iwant = (controlMsg.ihave?.length > 0) ? this.handleIHave(id, controlMsg.ihave) : []
const ihave = (controlMsg.iwant?.length > 0) ? this.handleIWant(id, controlMsg.iwant) : []
const prune = (controlMsg.graft?.length > 0) ? await this.handleGraft(id, controlMsg.graft) : []
;(controlMsg.prune?.length > 0) && (await this.handlePrune(id, controlMsg.prune))
;(controlMsg.idontwant?.length > 0) && this.handleIdontwant(id, controlMsg.idontwant)

if ((iwant.length === 0) && (ihave.length === 0) && (prune.length === 0)) {
return
Expand Down Expand Up @@ -1691,6 +1735,39 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
}
}

private handleIdontwant (id: PeerIdStr, idontwant: RPC.ControlIDontWant[]): void {
let idontwantCount = this.idontwantCounts.get(id) ?? 0
// return early if we have already received too many IDONTWANT messages from the peer
if (idontwantCount >= this.opts.idontwantMaxMessages) {
return
}
const startIdontwantCount = idontwantCount

let idontwants = this.idontwants.get(id)
if (idontwants == null) {
idontwants = new Map()
this.idontwants.set(id, idontwants)
}
let idonthave = 0
// eslint-disable-next-line no-labels
out: for (const { messageIDs } of idontwant) {
for (const msgId of messageIDs) {
if (idontwantCount >= this.opts.idontwantMaxMessages) {
// eslint-disable-next-line no-labels
break out
}
idontwantCount++

const msgIdStr = this.msgIdToStrFn(msgId)
idontwants.set(msgIdStr, this.heartbeatTicks)
if (!this.mcache.msgs.has(msgIdStr)) idonthave++
}
}
this.idontwantCounts.set(id, idontwantCount)
const total = idontwantCount - startIdontwantCount
this.metrics?.onIdontwantRcv(total, idonthave)
}

/**
* Add standard backoff log for a peer in a topic
*/
Expand Down Expand Up @@ -2353,6 +2430,27 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.sendRpc(id, out)
}

private sendIDontWants (msgId: Uint8Array, topic: string, source: PeerIdStr): void {
const ids = this.mesh.get(topic)
if (ids == null) {
return
}

// don't send IDONTWANT to:
// - the source
// - peers that don't support v1.2
const tosend = new Set(ids)
tosend.delete(source)
for (const id of tosend) {
if (this.streamsOutbound.get(id)?.protocol !== constants.GossipsubIDv12) {
tosend.delete(id)
}
}

const idontwantRpc = createGossipRpc([], { idontwant: [{ messageIDs: [msgId] }] })
this.sendRpcInBatch(tosend, idontwantRpc)
}

/**
* Send an rpc object to a peer
*/
Expand Down Expand Up @@ -2701,6 +2799,18 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
// apply IWANT request penalties
this.applyIwantPenalties()

// clean up IDONTWANT counters
this.idontwantCounts.clear()

// clean up old tracked IDONTWANTs
for (const idontwants of this.idontwants.values()) {
for (const [msgId, heartbeatTick] of idontwants) {
if (this.heartbeatTicks - heartbeatTick >= this.opts.mcacheLength) {
idontwants.delete(msgId)
}
}
}

// ensure direct peers are connected
if (this.heartbeatTicks % this.opts.directConnectTicks === 0) {
// we only do this every few ticks to allow pending connections to complete and account for restarts/downtime
Expand Down Expand Up @@ -3069,6 +3179,12 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
}
metrics.cacheSize.set({ cache: 'backoff' }, backoffSize)

let idontwantsCount = 0
for (const idontwant of this.idontwants.values()) {
idontwantsCount += idontwant.size
}
metrics.cacheSize.set({ cache: 'idontwants' }, idontwantsCount)

// Peer counts

for (const [topicStr, peers] of this.topics) {
Expand Down
2 changes: 2 additions & 0 deletions src/message/decodeRpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export interface DecodeRPCLimits {
maxMessages: number
maxIhaveMessageIDs: number
maxIwantMessageIDs: number
maxIdontwantMessageIDs: number
maxControlMessages: number
maxPeerInfos: number
}
Expand All @@ -12,6 +13,7 @@ export const defaultDecodeRpcLimits: DecodeRPCLimits = {
maxMessages: Infinity,
maxIhaveMessageIDs: Infinity,
maxIwantMessageIDs: Infinity,
maxIdontwantMessageIDs: Infinity,
maxControlMessages: Infinity,
maxPeerInfos: Infinity
}
6 changes: 6 additions & 0 deletions src/message/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message RPC {
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
repeated ControlIDontWant idontwant = 5;
}

message ControlIHave {
Expand All @@ -49,4 +50,9 @@ message RPC {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}

message ControlIDontWant {
repeated bytes messageIDs = 1;
}

}
Loading
Loading