diff --git a/README.md b/README.md index e59715f..305656d 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,25 @@ -# libp2p-pubsub +# @libp2p/pubsub -[![test & maybe release](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml/badge.svg)](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml) +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-pubsub.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-pubsub) +[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml) -> Contains an implementation of the Pubsub interface +> libp2p pubsub base class ## Table of contents +- [Install](#install) - [Usage](#usage) - [License](#license) - - [Contribution](#contribution) +- [Contribution](#contribution) + +## Install + +```console +$ npm i @libp2p/pubsub +``` ## Usage @@ -28,9 +39,9 @@ class MyPubsubImplementation extends PubSubBaseProtocol { Licensed under either of - * Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / http://www.apache.org/licenses/LICENSE-2.0) - * MIT ([LICENSE-MIT](LICENSE-MIT) / http://opensource.org/licenses/MIT) +- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](LICENSE-MIT) / ) -### Contribution +## Contribution Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/package.json b/package.json index a2bde00..3522b9e 100644 --- a/package.json +++ b/package.json @@ -45,24 +45,24 @@ ], "exports": { ".": { - "import": "./dist/src/index.js", - "types": "./dist/src/index.d.ts" + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" }, "./errors": { - "import": "./dist/src/errors.js", - "types": "./dist/src/errors.d.ts" + "types": "./dist/src/errors.d.ts", + "import": "./dist/src/errors.js" }, "./peer-streams": { - "import": "./dist/src/peer-streams.js", - "types": "./dist/src/peer-streams.d.ts" + "types": "./dist/src/peer-streams.d.ts", + "import": "./dist/src/peer-streams.js" }, "./signature-policy": { - "import": "./dist/src/signature-policy.js", - "types": "./dist/src/signature-policy.d.ts" + "types": "./dist/src/signature-policy.d.ts", + "import": "./dist/src/signature-policy.js" }, "./utils": { - "import": "./dist/src/utils.js", - "types": "./dist/src/utils.d.ts" + "types": "./dist/src/utils.d.ts", + "import": "./dist/src/utils.js" } }, "eslintConfig": { @@ -172,26 +172,31 @@ "release": "aegir release" }, "dependencies": { - "@libp2p/crypto": "^0.22.8", - "@libp2p/interfaces": "^2.0.0", + "@libp2p/components": "^1.0.0", + "@libp2p/crypto": "^1.0.0", + "@libp2p/interfaces": "^3.0.2", "@libp2p/logger": "^1.1.0", "@libp2p/peer-collections": "^1.0.0", "@libp2p/peer-id": "^1.1.0", - "@libp2p/topology": "^1.1.0", - "@multiformats/multiaddr": "^10.1.5", + "@libp2p/topology": "^2.0.0", + "@multiformats/multiaddr": "^10.2.0", "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", "iso-random-stream": "^2.0.0", "it-length-prefixed": "^7.0.1", "it-pipe": "^2.0.3", - "it-pushable": "^2.0.1", + "it-pushable": "^3.0.0", "multiformats": "^9.6.3", "p-queue": "^7.2.0", "uint8arrays": "^3.0.0" }, "devDependencies": { + "@libp2p/interface-connection": "^1.0.1", + "@libp2p/interface-peer-id": "^1.0.2", + "@libp2p/interface-pubsub": "^1.0.1", + "@libp2p/interface-registrar": "^1.0.0", "@libp2p/peer-id-factory": "^1.0.0", - "aegir": "^37.0.7", + "aegir": "^37.2.0", "delay": "^5.0.0", "it-pair": "^2.0.2", "p-defer": "^4.0.0", diff --git a/src/index.ts b/src/index.ts index 06d64dc..6605884 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,12 +11,12 @@ import { signMessage, verifySignature } from './sign.js' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { IncomingStreamData } from '@libp2p/interfaces/registrar' -import type { Connection } from '@libp2p/interfaces/connection' -import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interfaces/pubsub' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { IncomingStreamData } from '@libp2p/interface-registrar' +import type { Connection } from '@libp2p/interface-connection' +import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub' import { PeerMap, PeerSet } from '@libp2p/peer-collections' -import { Components, Initializable } from '@libp2p/interfaces/components' +import { Components, Initializable } from '@libp2p/components' const log = logger('libp2p:pubsub') @@ -63,7 +63,7 @@ export abstract class PubSubBaseProtocol extends EventEmi public multicodecs: string[] public components: Components = new Components() - private _registrarTopologyId: string | undefined + private _registrarTopologyIds: string[] | undefined protected enabled: boolean constructor (props: PubSubInit) { @@ -112,9 +112,10 @@ export abstract class PubSubBaseProtocol extends EventEmi log('starting') + const registrar = this.components.getRegistrar() // Incoming streams // Called after a peer dials us - await this.components.getRegistrar().handle(this.multicodecs, this._onIncomingStream) + await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream))) // register protocol with topology // Topology callbacks called on connection manager changes @@ -122,7 +123,7 @@ export abstract class PubSubBaseProtocol extends EventEmi onConnect: this._onPeerConnected, onDisconnect: this._onPeerDisconnected }) - this._registrarTopologyId = await this.components.getRegistrar().register(this.multicodecs, topology) + this._registrarTopologyIds = await Promise.all(this.multicodecs.map(async multicodec => await registrar.register(multicodec, topology))) log('started') this.started = true @@ -136,12 +137,14 @@ export abstract class PubSubBaseProtocol extends EventEmi return } + const registrar = this.components.getRegistrar() + // unregister protocol and handlers - if (this._registrarTopologyId != null) { - this.components.getRegistrar().unregister(this._registrarTopologyId) + if (this._registrarTopologyIds != null) { + this._registrarTopologyIds?.map(id => registrar.unregister(id)) } - await this.components.getRegistrar().unhandle(this.multicodecs) + await Promise.all(this.multicodecs.map(async multicodec => await registrar.unhandle(multicodec))) log('stopping') for (const peerStreams of this.peers.values()) { @@ -553,7 +556,7 @@ export abstract class PubSubBaseProtocol extends EventEmi /** * Get a list of the peer-ids that are subscribed to one topic. */ - getSubscribers (topic: string) { + getSubscribers (topic: string): PeerId[] { if (!this.started) { throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') } @@ -676,7 +679,7 @@ export abstract class PubSubBaseProtocol extends EventEmi return Array.from(this.subscriptions) } - getPeers () { + getPeers (): PeerId[] { if (!this.started) { throw new Error('Pubsub is not started') } diff --git a/src/peer-streams.ts b/src/peer-streams.ts index 0235a9d..6a9d621 100644 --- a/src/peer-streams.ts +++ b/src/peer-streams.ts @@ -4,10 +4,10 @@ import * as lp from 'it-length-prefixed' import { pushable } from 'it-pushable' import { pipe } from 'it-pipe' import { abortableSource } from 'abortable-iterator' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { Stream } from '@libp2p/interfaces/connection' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { Stream } from '@libp2p/interface-connection' import type { Pushable } from 'it-pushable' -import type { PeerStreamEvents } from '@libp2p/interfaces/pubsub' +import type { PeerStreamEvents } from '@libp2p/interface-pubsub' const log = logger('libp2p-pubsub:peer-streams') diff --git a/src/sign.ts b/src/sign.ts index e156cff..63d0f38 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -1,9 +1,9 @@ import { concat as uint8ArrayConcat } from 'uint8arrays/concat' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toRpcMessage } from './utils.js' -import type { PeerId } from '@libp2p/interfaces/peer-id' +import type { PeerId } from '@libp2p/interface-peer-id' import { keys } from '@libp2p/crypto' -import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' +import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub' import { peerIdFromKeys } from '@libp2p/peer-id' export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:') diff --git a/src/utils.ts b/src/utils.ts index 1d9e6df..d542133 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -2,7 +2,7 @@ import { randomBytes } from 'iso-random-stream' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { sha256 } from 'multiformats/hashes/sha2' -import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' +import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub' import { peerIdFromBytes } from '@libp2p/peer-id' import { codes } from './errors.js' import errcode from 'err-code' diff --git a/test/emit-self.spec.ts b/test/emit-self.spec.ts index 438134b..42be511 100644 --- a/test/emit-self.spec.ts +++ b/test/emit-self.spec.ts @@ -6,7 +6,7 @@ import { } from './utils/index.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import delay from 'delay' -import { Components } from '@libp2p/interfaces/components' +import { Components } from '@libp2p/components' const protocol = '/pubsub/1.0.0' const topic = 'foo' diff --git a/test/instance.spec.ts b/test/instance.spec.ts index 2b153a3..894a303 100644 --- a/test/instance.spec.ts +++ b/test/instance.spec.ts @@ -1,6 +1,6 @@ import { expect } from 'aegir/chai' import { PubSubBaseProtocol } from '../src/index.js' -import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' +import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' class PubsubProtocol extends PubSubBaseProtocol { decodeRpc (bytes: Uint8Array): PubSubRPC { diff --git a/test/lifecycle.spec.ts b/test/lifecycle.spec.ts index 49ecb4b..8198695 100644 --- a/test/lifecycle.spec.ts +++ b/test/lifecycle.spec.ts @@ -8,10 +8,10 @@ import { MockRegistrar, mockIncomingStreamEvent } from './utils/index.js' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { Registrar } from '@libp2p/interfaces/registrar' -import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' -import { Components } from '@libp2p/interfaces/components' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { Registrar } from '@libp2p/interface-registrar' +import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import { Components } from '@libp2p/components' class PubsubProtocol extends PubSubBaseProtocol { decodeRpc (bytes: Uint8Array): PubSubRPC { @@ -158,7 +158,7 @@ describe('pubsub base lifecycle', () => { // Notify peers of connection await topologyA.onConnect(peerIdB, c0) - await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) expect(pubsubA.peers.size).to.be.eql(1) expect(pubsubB.peers.size).to.be.eql(1) @@ -179,7 +179,7 @@ describe('pubsub base lifecycle', () => { sinon.spy(c0, 'newStream') await topologyA.onConnect(peerIdB, c0) - await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) expect(c0.newStream).to.have.property('callCount', 1) // @ts-expect-error _removePeer is a protected method @@ -219,7 +219,7 @@ describe('pubsub base lifecycle', () => { sinon.stub(c0, 'newStream').throws(error) await topologyA.onConnect(peerIdB, c0) - await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) expect(c0.newStream).to.have.property('callCount', 1) }) @@ -237,7 +237,7 @@ describe('pubsub base lifecycle', () => { const [c0, c1] = ConnectionPair() await topologyA.onConnect(peerIdB, c0) - await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) // Notice peers of disconnect topologyA?.onDisconnect(peerIdB) diff --git a/test/message.spec.ts b/test/message.spec.ts index efab59d..3cb187f 100644 --- a/test/message.spec.ts +++ b/test/message.spec.ts @@ -7,9 +7,9 @@ import { MockRegistrar, PubsubImplementation } from './utils/index.js' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { Message } from '@libp2p/interfaces/pubsub' -import { Components } from '@libp2p/interfaces/components' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { Message } from '@libp2p/interface-pubsub' +import { Components } from '@libp2p/components' describe('pubsub base messages', () => { let peerId: PeerId diff --git a/test/pubsub.spec.ts b/test/pubsub.spec.ts index 0d8a4ec..f78ff51 100644 --- a/test/pubsub.spec.ts +++ b/test/pubsub.spec.ts @@ -11,12 +11,12 @@ import { PubsubImplementation, mockIncomingStreamEvent } from './utils/index.js' -import type { PeerId } from '@libp2p/interfaces/peer-id' +import type { PeerId } from '@libp2p/interface-peer-id' import { PeerSet } from '@libp2p/peer-collections' -import { Components } from '@libp2p/interfaces/components' +import { Components } from '@libp2p/components' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { noSignMsgId } from '../src/utils.js' -import type { PubSubRPC } from '@libp2p/interfaces/src/pubsub' +import type { PubSubRPC } from '@libp2p/interface-pubsub' import delay from 'delay' import pDefer from 'p-defer' @@ -149,7 +149,7 @@ describe('pubsub base implementation', () => { const [c0, c1] = ConnectionPair() await topologyA.onConnect(peerIdB, c0) - await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) }) afterEach(async () => { @@ -259,7 +259,7 @@ describe('pubsub base implementation', () => { const [c0, c1] = ConnectionPair() await topologyA.onConnect(peerIdB, c0) - await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) }) afterEach(async () => { diff --git a/test/sign.spec.ts b/test/sign.spec.ts index 94e7e19..1a0d6c9 100644 --- a/test/sign.spec.ts +++ b/test/sign.spec.ts @@ -10,8 +10,8 @@ import { import * as PeerIdFactory from '@libp2p/peer-id-factory' import { randomSeqno, toRpcMessage } from '../src/utils.js' import { keys } from '@libp2p/crypto' -import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' -import type { PeerId } from '@libp2p/interfaces/peer-id' +import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import type { PeerId } from '@libp2p/interface-peer-id' function encodeMessage (message: PubSubRPCMessage) { return RPC.Message.encode(message) diff --git a/test/topic-validators.spec.ts b/test/topic-validators.spec.ts index b48cd2c..b78bb8f 100644 --- a/test/topic-validators.spec.ts +++ b/test/topic-validators.spec.ts @@ -10,9 +10,9 @@ import { MockRegistrar, PubsubImplementation } from './utils/index.js' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { PubSubRPC } from '@libp2p/interfaces/pubsub' -import { Components } from '@libp2p/interfaces/components' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { PubSubRPC } from '@libp2p/interface-pubsub' +import { Components } from '@libp2p/components' const protocol = '/pubsub/1.0.0' diff --git a/test/utils.spec.ts b/test/utils.spec.ts index 874f112..85a31ad 100644 --- a/test/utils.spec.ts +++ b/test/utils.spec.ts @@ -1,7 +1,7 @@ import { expect } from 'aegir/chai' import * as utils from '../src/utils.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' +import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub' import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id' describe('utils', () => { diff --git a/test/utils/index.ts b/test/utils/index.ts index 9c6ea11..8110d56 100644 --- a/test/utils/index.ts +++ b/test/utils/index.ts @@ -2,11 +2,10 @@ import { duplexPair } from 'it-pair/duplex' import * as PeerIdFactory from '@libp2p/peer-id-factory' import { PubSubBaseProtocol } from '../../src/index.js' import { RPC } from '../message/rpc.js' -import type { IncomingStreamData, Registrar, StreamHandler } from '@libp2p/interfaces/registrar' -import type { Topology } from '@libp2p/interfaces/topology' -import type { Connection } from '@libp2p/interfaces/connection' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub' +import type { IncomingStreamData, Registrar, StreamHandler, StreamHandlerRecord, Topology } from '@libp2p/interface-registrar' +import type { Connection } from '@libp2p/interface-connection' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' export const createPeerId = async (): Promise => { const peerId = await PeerIdFactory.createEd25519PeerId() @@ -76,14 +75,14 @@ export class MockRegistrar implements Registrar { }) } - getHandler (protocol: string) { + getHandler (protocol: string): StreamHandlerRecord { const handler = this.handlers.get(protocol) if (handler == null) { throw new Error(`No handler registered for protocol ${protocol}`) } - return handler + return { handler, options: {} } } async register (protocols: string | string[], topology: Topology) {