Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

fix: update pubsub interface in line with gossipsub #199

Merged
merged 1 commit into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,19 @@ import type { ConnectionManager, ConnectionManagerEvents } from '@libp2p/interfa

class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager {
getConnectionMap (): Map<string, Connection[]> {
throw new Error('Method not implemented.')
return new Map<string, Connection[]>()
}

getConnectionList (): Connection[] {
throw new Error('Method not implemented.')
return []
}

getConnections (): Connection[] {
throw new Error('Method not implemented.')
return []
}

getConnection (peerId: PeerId): Connection | undefined {
throw new Error('Method not implemented.')
}

listenerCount (type: string): number {
throw new Error('Method not implemented.')
}

addEventListener<U extends keyof ConnectionManagerEvents>(type: U, callback: ((evt: ConnectionManagerEvents[U]) => void) | { handleEvent: (evt: ConnectionManagerEvents[U]) => void } | null, options?: boolean | AddEventListenerOptions): void {
throw new Error('Method not implemented.')
}

removeEventListener<U extends keyof ConnectionManagerEvents>(type: U, callback: (((evt: ConnectionManagerEvents[U]) => void) | { handleEvent: (evt: ConnectionManagerEvents[U]) => void } | null) | undefined, options?: boolean | EventListenerOptions): void {
throw new Error('Method not implemented.')
}

dispatchEvent (event: Event): boolean {
throw new Error('Method not implemented.')
return undefined
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,18 @@ export interface Peer {
registrar: Registrar
}

export function connectionPair (a: Peer, b: Peer): [ Connection, Connection ] {
export function connectionPair (a: Components, b: Components): [ Connection, Connection ] {
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array>()

return [
mockConnection(
mockMultiaddrConnection(peerAtoPeerB, b.peerId), {
registrar: a.registrar
mockMultiaddrConnection(peerAtoPeerB, b.getPeerId()), {
registrar: a.getRegistrar()
}
),
mockConnection(
mockMultiaddrConnection(peerBtoPeerA, a.peerId), {
registrar: b.registrar
mockMultiaddrConnection(peerBtoPeerA, a.getPeerId()), {
registrar: b.getRegistrar()
}
)
]
Expand Down
16 changes: 6 additions & 10 deletions packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Connection } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Topology } from '@libp2p/interfaces/topology'
import { connectionPair } from './connection.js'
import type { Components } from '@libp2p/interfaces/src/components'

export class MockRegistrar implements Registrar {
private readonly topologies: Map<string, { topology: Topology, protocols: string[] }> = new Map()
Expand Down Expand Up @@ -106,20 +107,15 @@ export async function mockIncomingStreamEvent (protocol: string, conn: Connectio
}
}

export interface Peer {
peerId: PeerId
registrar: Registrar
}

export async function connectPeers (protocol: string, a: Peer, b: Peer) {
export async function connectPeers (protocol: string, a: Components, b: Components) {
// Notify peers of connection
const [aToB, bToA] = connectionPair(a, b)

for (const topology of a.registrar.getTopologies(protocol)) {
await topology.onConnect(b.peerId, aToB)
for (const topology of a.getRegistrar().getTopologies(protocol)) {
await topology.onConnect(b.getPeerId(), aToB)
}

for (const topology of b.registrar.getTopologies(protocol)) {
await topology.onConnect(a.peerId, bToA)
for (const topology of b.getRegistrar().getTopologies(protocol)) {
await topology.onConnect(a.getPeerId(), bToA)
}
}
27 changes: 11 additions & 16 deletions packages/libp2p-interface-compliance-tests/src/pubsub/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,27 @@ import sinon from 'sinon'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { mockRegistrar } from '../mocks/registrar.js'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import delay from 'delay'
import type { TestSetup } from '../index.js'
import type { PubSub } from '@libp2p/interfaces/pubsub'
import type { PubSubArgs } from './index.js'
import type { Registrar } from '@libp2p/interfaces/registrar'
import { Components } from '@libp2p/interfaces/components'
import type { Components } from '@libp2p/interfaces/components'
import { createComponents } from './utils.js'

const topic = 'foo'
const data = uint8ArrayFromString('bar')

export default (common: TestSetup<PubSub, PubSubArgs>) => {
describe('pubsub api', () => {
let pubsub: PubSub
let registrar: Registrar
let components: Components

// Create pubsub router
beforeEach(async () => {
registrar = mockRegistrar()
components = await createComponents()

pubsub = await common.setup({
components: new Components({
peerId: await createEd25519PeerId(),
registrar
}),
components,
init: {
emitSelf: true
}
Expand All @@ -42,22 +37,22 @@ export default (common: TestSetup<PubSub, PubSubArgs>) => {
})

it('can start correctly', async () => {
sinon.spy(registrar, 'register')
sinon.spy(components.getRegistrar(), 'register')

await pubsub.start()

expect(pubsub.isStarted()).to.equal(true)
expect(registrar.register).to.have.property('callCount', 1)
expect(components.getRegistrar().register).to.have.property('callCount', 1)
})

it('can stop correctly', async () => {
sinon.spy(registrar, 'unregister')
sinon.spy(components.getRegistrar(), 'unregister')

await pubsub.start()
await pubsub.stop()

expect(pubsub.isStarted()).to.equal(false)
expect(registrar.unregister).to.have.property('callCount', 1)
expect(components.getRegistrar().unregister).to.have.property('callCount', 1)
})

it('can subscribe and unsubscribe correctly', async () => {
Expand All @@ -80,7 +75,7 @@ export default (common: TestSetup<PubSub, PubSubArgs>) => {
await pWaitFor(() => pubsub.getTopics().length === 0)

// Publish to guarantee the handler is not called
pubsub.publish(topic, data)
await pubsub.publish(topic, data)

// handlers are called async
await delay(100)
Expand All @@ -99,7 +94,7 @@ export default (common: TestSetup<PubSub, PubSubArgs>) => {
expect(evt).to.have.deep.nested.property('detail.data', data)
defer.resolve()
})
pubsub.publish(topic, data)
await pubsub.publish(topic, data)
await defer.promise

await pubsub.stop()
Expand Down
Loading