Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch publish #480

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CustomEvent, TypedEventEmitter, StrictSign, StrictNoSign, TopicValidatorResult } from '@libp2p/interface'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { encode } from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import * as constants from './constants.js'
Expand Down Expand Up @@ -91,6 +92,8 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
fallbackToFloodsub: boolean
/** if self-published messages should be sent to all peers */
floodPublish: boolean
/** serialize message once and send to all peers without control messages */
batchPublish: boolean
/** whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted nodes. */
doPX: boolean
/** peers with which we will maintain direct connections */
Expand Down Expand Up @@ -393,6 +396,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const opts = {
fallbackToFloodsub: true,
floodPublish: true,
batchPublish: false,
doPX: false,
directPeers: [],
D: constants.GossipsubD,
Expand Down Expand Up @@ -2091,14 +2095,20 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
// If the message is anonymous or has a random author add it to the published message ids cache.
this.publishedMessageIds.put(msgIdStr)

// Send to set of peers aggregated from direct, mesh, fanout
for (const id of tosend) {
// sendRpc may mutate RPC message on piggyback, create a new message for each peer
const sent = this.sendRpc(id, { messages: [rawMsg] })

// did not actually send the message
if (!sent) {
tosend.delete(id)
const batchPublish = opts?.batchPublish ?? this.opts.batchPublish
const rpc = { messages: [rawMsg] }
if (batchPublish) {
this.sendRpcInBatch(tosend, rpc)
} else {
// Send to set of peers aggregated from direct, mesh, fanout
for (const id of tosend) {
// sendRpc may mutate RPC message on piggyback, create a new message for each peer
const sent = this.sendRpc(id, rpc)

// did not actually send the message
if (!sent) {
tosend.delete(id)
}
}
}

Expand Down Expand Up @@ -2133,6 +2143,32 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
}
}

/**
* Send the same data in batch to tosend list without considering cached control messages
* This is not only faster but also avoid allocating memory for each peer
* see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/344
*/
private sendRpcInBatch (tosend: Set<PeerIdStr>, rpc: IRPC): void {
const rpcBytes = RPC.encode(rpc).finish()
const prefixedData = encode.single(rpcBytes)
for (const id of tosend) {
const outboundStream = this.streamsOutbound.get(id)
if (outboundStream == null) {
this.log(`Cannot send RPC to ${id} as there is no open stream to it available`)
tosend.delete(id)
continue
}
try {
outboundStream.pushPrefixed(prefixedData)
} catch (e) {
tosend.delete(id)
this.log.error(`Cannot send rpc to ${id}`, e)
}

this.metrics?.onRpcSent(rpc, rpcBytes.length)
}
}

/**
* This function should be called when `asyncValidation` is `true` after
* the message got validated by the caller. Messages are stored in the `mcache` and
Expand Down
17 changes: 17 additions & 0 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ interface InboundStreamOpts {

export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly lpPushable: Pushable<Uint8ArrayList>
private readonly closeController: AbortController
private readonly maxBufferSize: number

constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.lpPushable = pushable({ objectMode: false })
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

Expand All @@ -30,6 +32,10 @@ export class OutboundStream {
(source) => encode(source),
this.rawStream
).catch(errCallback)

pipe(abortableSource(this.lpPushable, this.closeController.signal, { returnOnAbort: true }), this.rawStream).catch(
errCallback
)
}

get protocol (): string {
Expand All @@ -46,10 +52,21 @@ export class OutboundStream {
this.pushable.push(data)
}

/**
* Same to push() but this is prefixed data so no need to encode length prefixed again
*/
pushPrefixed (data: Uint8ArrayList): void {
if (this.lpPushable.readableLength > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}
this.lpPushable.push(data)
}

async close (): Promise<void> {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
await this.pushable.return()
await this.lpPushable.return()
await this.rawStream.close()
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export enum SignaturePolicy {
export interface PublishOpts {
allowPublishToZeroPeers?: boolean
ignoreDuplicatePublishError?: boolean
/** serialize message once and send to all peers without control messages */
batchPublish?: boolean
}

export enum PublishConfigType {
Expand Down
70 changes: 38 additions & 32 deletions test/e2e/go-gossipsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
number: 20,
init: {
floodPublish: false,
batchPublish: true,
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
Expand Down Expand Up @@ -112,42 +113,47 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
})

it('test dense gossipsub', async function () {
// Create 20 gossipsub nodes
// Subscribe to the topic, all nodes
// Densely connect the nodes
// Publish 100 messages, each from a random node
// Assert that subscribed nodes receive the message
psubs = await createComponentsArray({
number: 20,
init: {
floodPublish: false,
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
const batchOpts = [true, false]
for (const batchPublish of batchOpts) {
// eslint-disable-next-line no-loop-func
it(`test dense gossipsub batchPublish=${batchPublish}`, async function () {
// Create 20 gossipsub nodes
// Subscribe to the topic, all nodes
// Densely connect the nodes
// Publish 100 messages, each from a random node
// Assert that subscribed nodes receive the message
psubs = await createComponentsArray({
number: 20,
init: {
floodPublish: false,
batchPublish,
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
}
}
}
})
const topic = 'foobar'
psubs.forEach((ps) => { ps.pubsub.subscribe(topic) })
})
const topic = 'foobar'
psubs.forEach((ps) => { ps.pubsub.subscribe(topic) })

await denseConnect(psubs)
await denseConnect(psubs)

// wait for heartbeats to build mesh
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))
// wait for heartbeats to build mesh
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const sendRecv = []
for (let i = 0; i < 100; i++) {
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
const owner = Math.floor(Math.random() * psubs.length)
const results = Promise.all(
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
)
sendRecv.push(psubs[owner].pubsub.publish(topic, msg))
sendRecv.push(results)
}
await Promise.all(sendRecv)
})
const sendRecv = []
for (let i = 0; i < 100; i++) {
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
const owner = Math.floor(Math.random() * psubs.length)
const results = Promise.all(
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
)
sendRecv.push(psubs[owner].pubsub.publish(topic, msg))
sendRecv.push(results)
}
await Promise.all(sendRecv)
})
}

it('test gossipsub fanout', async function () {
// Create 20 gossipsub nodes
Expand Down
53 changes: 29 additions & 24 deletions test/floodsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,33 +180,38 @@ describe('gossipsub fallbacks to floodsub', () => {
mockNetwork.reset()
})

it('Publish to a topic - nodeGs', async () => {
const promise = pEvent<'message', CustomEvent<Message>>(nodeFs.pubsub, 'message')
const data = uint8ArrayFromString('hey')

await nodeGs.pubsub.publish(topic, data)

const evt = await promise
if (evt.detail.type !== 'signed') {
throw new Error('unexpected message type')
}
expect(evt.detail.data).to.equalBytes(data)
expect(evt.detail.from.toString()).to.be.eql(nodeGs.components.peerId.toString())
})
const batchPublishOpts = [true, false]
for (const batchPublish of batchPublishOpts) {
// eslint-disable-next-line no-loop-func
it(`Publish to a topic - nodeGs - batchPublish: ${batchPublish}`, async () => {
const promise = pEvent<'message', CustomEvent<Message>>(nodeFs.pubsub, 'message')
const data = uint8ArrayFromString('hey')

await nodeGs.pubsub.publish(topic, data, { batchPublish })

const evt = await promise
if (evt.detail.type !== 'signed') {
throw new Error('unexpected message type')
}
expect(evt.detail.data).to.equalBytes(data)
expect(evt.detail.from.toString()).to.be.eql(nodeGs.components.peerId.toString())
})

it('Publish to a topic - nodeFs', async () => {
const promise = pEvent<'message', CustomEvent<Message>>(nodeGs.pubsub, 'message')
const data = uint8ArrayFromString('banana')
// eslint-disable-next-line no-loop-func
it(`Publish to a topic - nodeFs - batchPublish: ${batchPublish}`, async () => {
const promise = pEvent<'message', CustomEvent<Message>>(nodeGs.pubsub, 'message')
const data = uint8ArrayFromString('banana')

await nodeFs.pubsub.publish(topic, data)
await nodeFs.pubsub.publish(topic, data, { batchPublish })

const evt = await promise
if (evt.detail.type !== 'signed') {
throw new Error('unexpected message type')
}
expect(evt.detail.data).to.equalBytes(data)
expect(evt.detail.from.toString()).to.be.eql(nodeFs.components.peerId.toString())
})
const evt = await promise
if (evt.detail.type !== 'signed') {
throw new Error('unexpected message type')
}
expect(evt.detail.data).to.equalBytes(data)
expect(evt.detail.from.toString()).to.be.eql(nodeFs.components.peerId.toString())
})
}
})

describe('publish after unsubscribe', () => {
Expand Down
Loading