From 12c62e283aade89f423d25954c6004c41fcfb46d Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 4 Jan 2024 13:19:41 +0700 Subject: [PATCH 1/3] feat: batch publish --- src/index.ts | 52 +++++++++++++++++++++++++++++++++++++++++++-------- src/stream.ts | 17 +++++++++++++++++ src/types.ts | 1 + 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/src/index.ts b/src/index.ts index 71339b6c..ed386890 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import { CustomEvent, TypedEventEmitter, StrictSign, StrictNoSign, TopicValidatorResult } from '@libp2p/interface' import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id' +import { encode } from 'it-length-prefixed' import { pipe } from 'it-pipe' import { pushable } from 'it-pushable' import * as constants from './constants.js' @@ -91,6 +92,8 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { fallbackToFloodsub: boolean /** if self-published messages should be sent to all peers */ floodPublish: boolean + /** serialize message once and send to all peers without control messages */ + batchPublish: boolean /** whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted nodes. */ doPX: boolean /** peers with which we will maintain direct connections */ @@ -393,6 +396,7 @@ export class GossipSub extends TypedEventEmitter implements Pub const opts = { fallbackToFloodsub: true, floodPublish: true, + batchPublish: false, doPX: false, directPeers: [], D: constants.GossipsubD, @@ -2091,14 +2095,20 @@ export class GossipSub extends TypedEventEmitter implements Pub // If the message is anonymous or has a random author add it to the published message ids cache. this.publishedMessageIds.put(msgIdStr) - // Send to set of peers aggregated from direct, mesh, fanout - for (const id of tosend) { - // sendRpc may mutate RPC message on piggyback, create a new message for each peer - const sent = this.sendRpc(id, { messages: [rawMsg] }) - - // did not actually send the message - if (!sent) { - tosend.delete(id) + const batchPublish = opts?.batch ?? this.opts.batchPublish + const rpc = { messages: [rawMsg] } + if (batchPublish) { + this.sendRpcInBatch(tosend, rpc) + } else { + // Send to set of peers aggregated from direct, mesh, fanout + for (const id of tosend) { + // sendRpc may mutate RPC message on piggyback, create a new message for each peer + const sent = this.sendRpc(id, rpc) + + // did not actually send the message + if (!sent) { + tosend.delete(id) + } } } @@ -2133,6 +2143,32 @@ export class GossipSub extends TypedEventEmitter implements Pub } } + /** + * Send the same data in batch to tosend list without considering cached control messages + * This is not only faster but also avoid allocating memory for each peer + * see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/344 + */ + private sendRpcInBatch (tosend: Set, rpc: IRPC): void { + const rpcBytes = RPC.encode(rpc).finish() + const prefixedData = encode.single(rpcBytes) + for (const id of tosend) { + const outboundStream = this.streamsOutbound.get(id) + if (outboundStream == null) { + this.log(`Cannot send RPC to ${id} as there is no open stream to it available`) + tosend.delete(id) + continue + } + try { + outboundStream.pushPrefixed(prefixedData) + } catch (e) { + tosend.delete(id) + this.log.error(`Cannot send rpc to ${id}`, e) + } + + this.metrics?.onRpcSent(rpc, rpcBytes.length) + } + } + /** * This function should be called when `asyncValidation` is `true` after * the message got validated by the caller. Messages are stored in the `mcache` and diff --git a/src/stream.ts b/src/stream.ts index 00573326..cf6cad66 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -17,11 +17,13 @@ interface InboundStreamOpts { export class OutboundStream { private readonly pushable: Pushable + private readonly lpPushable: Pushable private readonly closeController: AbortController private readonly maxBufferSize: number constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) { this.pushable = pushable({ objectMode: false }) + this.lpPushable = pushable({ objectMode: false }) this.closeController = new AbortController() this.maxBufferSize = opts.maxBufferSize ?? Infinity @@ -30,6 +32,10 @@ export class OutboundStream { (source) => encode(source), this.rawStream ).catch(errCallback) + + pipe(abortableSource(this.lpPushable, this.closeController.signal, { returnOnAbort: true }), this.rawStream).catch( + errCallback + ) } get protocol (): string { @@ -46,10 +52,21 @@ export class OutboundStream { this.pushable.push(data) } + /** + * Same to push() but this is prefixed data so no need to encode length prefixed again + */ + pushPrefixed (data: Uint8ArrayList): void { + if (this.lpPushable.readableLength > this.maxBufferSize) { + throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`) + } + this.lpPushable.push(data) + } + async close (): Promise { this.closeController.abort() // similar to pushable.end() but clear the internal buffer await this.pushable.return() + await this.lpPushable.return() await this.rawStream.close() } } diff --git a/src/types.ts b/src/types.ts index 25fbad35..8870c19a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -73,6 +73,7 @@ export enum SignaturePolicy { export interface PublishOpts { allowPublishToZeroPeers?: boolean ignoreDuplicatePublishError?: boolean + batch?: boolean } export enum PublishConfigType { From 9be54e7938fe17f09d347a0ba587e0863c3638fe Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 4 Jan 2024 13:20:07 +0700 Subject: [PATCH 2/3] chore: add tests --- test/e2e/go-gossipsub.spec.ts | 70 +++++++++++++++++++---------------- test/floodsub.spec.ts | 53 ++++++++++++++------------ 2 files changed, 67 insertions(+), 56 deletions(-) diff --git a/test/e2e/go-gossipsub.spec.ts b/test/e2e/go-gossipsub.spec.ts index 7f363fd5..776b992c 100644 --- a/test/e2e/go-gossipsub.spec.ts +++ b/test/e2e/go-gossipsub.spec.ts @@ -84,6 +84,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () { number: 20, init: { floodPublish: false, + batchPublish: true, scoreParams: { IPColocationFactorThreshold: 20, behaviourPenaltyWeight: 0 @@ -112,42 +113,47 @@ describe('go-libp2p-pubsub gossipsub tests', function () { await Promise.all(sendRecv) }) - it('test dense gossipsub', async function () { - // Create 20 gossipsub nodes - // Subscribe to the topic, all nodes - // Densely connect the nodes - // Publish 100 messages, each from a random node - // Assert that subscribed nodes receive the message - psubs = await createComponentsArray({ - number: 20, - init: { - floodPublish: false, - scoreParams: { - IPColocationFactorThreshold: 20, - behaviourPenaltyWeight: 0 + const batchOpts = [true, false] + for (const batchPublish of batchOpts) { + // eslint-disable-next-line no-loop-func + it(`test dense gossipsub batchPublish=${batchPublish}`, async function () { + // Create 20 gossipsub nodes + // Subscribe to the topic, all nodes + // Densely connect the nodes + // Publish 100 messages, each from a random node + // Assert that subscribed nodes receive the message + psubs = await createComponentsArray({ + number: 20, + init: { + floodPublish: false, + batchPublish, + scoreParams: { + IPColocationFactorThreshold: 20, + behaviourPenaltyWeight: 0 + } } - } - }) - const topic = 'foobar' - psubs.forEach((ps) => { ps.pubsub.subscribe(topic) }) + }) + const topic = 'foobar' + psubs.forEach((ps) => { ps.pubsub.subscribe(topic) }) - await denseConnect(psubs) + await denseConnect(psubs) - // wait for heartbeats to build mesh - await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2))) + // wait for heartbeats to build mesh + await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2))) - const sendRecv = [] - for (let i = 0; i < 100; i++) { - const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`) - const owner = Math.floor(Math.random() * psubs.length) - const results = Promise.all( - psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i)) - ) - sendRecv.push(psubs[owner].pubsub.publish(topic, msg)) - sendRecv.push(results) - } - await Promise.all(sendRecv) - }) + const sendRecv = [] + for (let i = 0; i < 100; i++) { + const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`) + const owner = Math.floor(Math.random() * psubs.length) + const results = Promise.all( + psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i)) + ) + sendRecv.push(psubs[owner].pubsub.publish(topic, msg)) + sendRecv.push(results) + } + await Promise.all(sendRecv) + }) + } it('test gossipsub fanout', async function () { // Create 20 gossipsub nodes diff --git a/test/floodsub.spec.ts b/test/floodsub.spec.ts index 0fb47a49..c23d9f74 100644 --- a/test/floodsub.spec.ts +++ b/test/floodsub.spec.ts @@ -180,33 +180,38 @@ describe('gossipsub fallbacks to floodsub', () => { mockNetwork.reset() }) - it('Publish to a topic - nodeGs', async () => { - const promise = pEvent<'message', CustomEvent>(nodeFs.pubsub, 'message') - const data = uint8ArrayFromString('hey') - - await nodeGs.pubsub.publish(topic, data) - - const evt = await promise - if (evt.detail.type !== 'signed') { - throw new Error('unexpected message type') - } - expect(evt.detail.data).to.equalBytes(data) - expect(evt.detail.from.toString()).to.be.eql(nodeGs.components.peerId.toString()) - }) + const batchOpts = [true, false] + for (const batch of batchOpts) { + // eslint-disable-next-line no-loop-func + it(`Publish to a topic - nodeGs - batchPublish: ${batch}`, async () => { + const promise = pEvent<'message', CustomEvent>(nodeFs.pubsub, 'message') + const data = uint8ArrayFromString('hey') + + await nodeGs.pubsub.publish(topic, data, { batch }) + + const evt = await promise + if (evt.detail.type !== 'signed') { + throw new Error('unexpected message type') + } + expect(evt.detail.data).to.equalBytes(data) + expect(evt.detail.from.toString()).to.be.eql(nodeGs.components.peerId.toString()) + }) - it('Publish to a topic - nodeFs', async () => { - const promise = pEvent<'message', CustomEvent>(nodeGs.pubsub, 'message') - const data = uint8ArrayFromString('banana') + // eslint-disable-next-line no-loop-func + it(`Publish to a topic - nodeFs - batchPublish: ${batch}`, async () => { + const promise = pEvent<'message', CustomEvent>(nodeGs.pubsub, 'message') + const data = uint8ArrayFromString('banana') - await nodeFs.pubsub.publish(topic, data) + await nodeFs.pubsub.publish(topic, data, { batch }) - const evt = await promise - if (evt.detail.type !== 'signed') { - throw new Error('unexpected message type') - } - expect(evt.detail.data).to.equalBytes(data) - expect(evt.detail.from.toString()).to.be.eql(nodeFs.components.peerId.toString()) - }) + const evt = await promise + if (evt.detail.type !== 'signed') { + throw new Error('unexpected message type') + } + expect(evt.detail.data).to.equalBytes(data) + expect(evt.detail.from.toString()).to.be.eql(nodeFs.components.peerId.toString()) + }) + } }) describe('publish after unsubscribe', () => { From d360060a6becea288e51f268b6599cb8855ede8b Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Sat, 6 Jan 2024 12:53:49 +0700 Subject: [PATCH 3/3] refactor: batchPublish option instead of batch --- src/index.ts | 2 +- src/types.ts | 3 ++- test/floodsub.spec.ts | 12 ++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/index.ts b/src/index.ts index ed386890..78422347 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2095,7 +2095,7 @@ export class GossipSub extends TypedEventEmitter implements Pub // If the message is anonymous or has a random author add it to the published message ids cache. this.publishedMessageIds.put(msgIdStr) - const batchPublish = opts?.batch ?? this.opts.batchPublish + const batchPublish = opts?.batchPublish ?? this.opts.batchPublish const rpc = { messages: [rawMsg] } if (batchPublish) { this.sendRpcInBatch(tosend, rpc) diff --git a/src/types.ts b/src/types.ts index 8870c19a..c1c623c4 100644 --- a/src/types.ts +++ b/src/types.ts @@ -73,7 +73,8 @@ export enum SignaturePolicy { export interface PublishOpts { allowPublishToZeroPeers?: boolean ignoreDuplicatePublishError?: boolean - batch?: boolean + /** serialize message once and send to all peers without control messages */ + batchPublish?: boolean } export enum PublishConfigType { diff --git a/test/floodsub.spec.ts b/test/floodsub.spec.ts index c23d9f74..9414247d 100644 --- a/test/floodsub.spec.ts +++ b/test/floodsub.spec.ts @@ -180,14 +180,14 @@ describe('gossipsub fallbacks to floodsub', () => { mockNetwork.reset() }) - const batchOpts = [true, false] - for (const batch of batchOpts) { + const batchPublishOpts = [true, false] + for (const batchPublish of batchPublishOpts) { // eslint-disable-next-line no-loop-func - it(`Publish to a topic - nodeGs - batchPublish: ${batch}`, async () => { + it(`Publish to a topic - nodeGs - batchPublish: ${batchPublish}`, async () => { const promise = pEvent<'message', CustomEvent>(nodeFs.pubsub, 'message') const data = uint8ArrayFromString('hey') - await nodeGs.pubsub.publish(topic, data, { batch }) + await nodeGs.pubsub.publish(topic, data, { batchPublish }) const evt = await promise if (evt.detail.type !== 'signed') { @@ -198,11 +198,11 @@ describe('gossipsub fallbacks to floodsub', () => { }) // eslint-disable-next-line no-loop-func - it(`Publish to a topic - nodeFs - batchPublish: ${batch}`, async () => { + it(`Publish to a topic - nodeFs - batchPublish: ${batchPublish}`, async () => { const promise = pEvent<'message', CustomEvent>(nodeGs.pubsub, 'message') const data = uint8ArrayFromString('banana') - await nodeFs.pubsub.publish(topic, data, { batch }) + await nodeFs.pubsub.publish(topic, data, { batchPublish }) const evt = await promise if (evt.detail.type !== 'signed') {